jsocket.cpp 173 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205420642074208420942104211421242134214421542164217421842194220422142224223422442254226422742284229423042314232423342344235423642374238423942404241424242434244424542464247424842494250425142524253425442554256425742584259426042614262426342644265426642674268426942704271427242734274427542764277427842794280428142824283428442854286428742884289429042914292429342944295429642974298429943004301430243034304430543064307430843094310431143124313431443154316431743184319432043214322432343244325432643274328432943304331433243334334433543364337433843394340434143424343434443454346434743484349435043514352435343544355435643574358435943604361436243634364436543664367436843694370437143724373437443754376437743784379438043814382438343844385438643874388438943904391439243934394439543964397439843994400440144024403440444054406440744084409441044114412441344144415441644174418441944204421442244234424442544264427442844294430443144324433443444354436443744384439444044414442444344444445444644474448444944504451445244534454445544564457445844594460446144624463446444654466446744684469447044714472447344744475447644774478447944804481448244834484448544864487448844894490449144924493449444954496449744984499450045014502450345044505450645074508450945104511451245134514451545164517451845194520452145224523452445254526452745284529453045314532453345344535453645374538453945404541454245434544454545464547454845494550455145524553455445554556455745584559456045614562456345644565456645674568456945704571457245734574457545764577457845794580458145824583458445854586458745884589459045914592459345944595459645974598459946004601460246034604460546064607460846094610461146124613461446154616461746184619462046214622462346244625462646274628462946304631463246334634463546364637463846394640464146424643464446454646464746484649465046514652465346544655465646574658465946604661466246634664466546664667466846694670467146724673467446754676467746784679468046814682468346844685468646874688468946904691469246934694469546964697469846994700470147024703470447054706470747084709471047114712471347144715471647174718471947204721472247234724472547264727472847294730473147324733473447354736473747384739474047414742474347444745474647474748474947504751475247534754475547564757475847594760476147624763476447654766476747684769477047714772477347744775477647774778477947804781478247834784478547864787478847894790479147924793479447954796479747984799480048014802480348044805480648074808480948104811481248134814481548164817481848194820482148224823482448254826482748284829483048314832483348344835483648374838483948404841484248434844484548464847484848494850485148524853485448554856485748584859486048614862486348644865486648674868486948704871487248734874487548764877487848794880488148824883488448854886488748884889489048914892489348944895489648974898489949004901490249034904490549064907490849094910491149124913491449154916491749184919492049214922492349244925492649274928492949304931493249334934493549364937493849394940494149424943494449454946494749484949495049514952495349544955495649574958495949604961496249634964496549664967496849694970497149724973497449754976497749784979498049814982498349844985498649874988498949904991499249934994499549964997499849995000500150025003500450055006500750085009501050115012501350145015501650175018501950205021502250235024502550265027502850295030503150325033503450355036503750385039504050415042504350445045504650475048504950505051505250535054505550565057505850595060506150625063506450655066506750685069507050715072507350745075507650775078507950805081508250835084508550865087508850895090509150925093509450955096509750985099510051015102510351045105510651075108510951105111511251135114511551165117511851195120512151225123512451255126512751285129513051315132513351345135513651375138513951405141514251435144514551465147514851495150515151525153515451555156515751585159516051615162516351645165516651675168516951705171517251735174517551765177517851795180518151825183518451855186518751885189519051915192519351945195519651975198519952005201520252035204520552065207520852095210521152125213521452155216521752185219522052215222522352245225522652275228522952305231523252335234523552365237523852395240524152425243524452455246524752485249525052515252525352545255525652575258525952605261526252635264526552665267526852695270527152725273527452755276527752785279528052815282528352845285528652875288528952905291529252935294529552965297529852995300530153025303530453055306530753085309531053115312531353145315531653175318531953205321532253235324532553265327532853295330533153325333533453355336533753385339534053415342534353445345534653475348534953505351535253535354535553565357535853595360536153625363536453655366536753685369537053715372537353745375537653775378537953805381538253835384538553865387538853895390539153925393539453955396539753985399540054015402540354045405540654075408540954105411541254135414541554165417541854195420542154225423542454255426542754285429543054315432543354345435543654375438543954405441544254435444544554465447544854495450545154525453545454555456545754585459546054615462546354645465546654675468546954705471547254735474547554765477547854795480548154825483548454855486548754885489549054915492549354945495549654975498549955005501550255035504550555065507550855095510551155125513551455155516551755185519552055215522552355245525552655275528552955305531553255335534553555365537553855395540554155425543554455455546554755485549555055515552555355545555555655575558555955605561556255635564556555665567556855695570557155725573557455755576557755785579558055815582558355845585558655875588558955905591559255935594559555965597559855995600560156025603560456055606560756085609561056115612561356145615561656175618561956205621562256235624562556265627562856295630563156325633563456355636563756385639564056415642564356445645564656475648564956505651565256535654565556565657565856595660566156625663566456655666566756685669567056715672567356745675567656775678567956805681568256835684568556865687568856895690569156925693569456955696569756985699570057015702570357045705570657075708570957105711571257135714571557165717571857195720572157225723572457255726572757285729573057315732573357345735573657375738573957405741574257435744574557465747574857495750575157525753575457555756575757585759576057615762576357645765576657675768576957705771577257735774577557765777577857795780578157825783578457855786578757885789579057915792579357945795579657975798579958005801580258035804580558065807580858095810581158125813581458155816581758185819582058215822582358245825582658275828582958305831583258335834583558365837583858395840584158425843584458455846584758485849585058515852585358545855585658575858585958605861586258635864586558665867586858695870587158725873587458755876587758785879588058815882588358845885588658875888588958905891589258935894589558965897589858995900590159025903590459055906590759085909591059115912591359145915591659175918591959205921592259235924592559265927592859295930593159325933593459355936593759385939594059415942594359445945594659475948594959505951595259535954595559565957595859595960596159625963596459655966596759685969597059715972597359745975597659775978597959805981598259835984
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. // New IPv6 Version - IN PROGRESS
  14. /*
  15. TBD IPv6 connect
  16. multicast
  17. look at loopback
  18. */
  19. #include "platform.h"
  20. #ifdef _VER_C5
  21. #include <clwclib.h>
  22. #else
  23. #include "platform.h"
  24. #include <stdio.h>
  25. #endif
  26. #include <algorithm>
  27. #ifdef _WIN32
  28. #define WIN32_LEAN_AND_MEAN
  29. #include <windows.h>
  30. #include <winsock2.h>
  31. #include <ws2tcpip.h>
  32. #include <signal.h>
  33. #else
  34. #include <sys/types.h>
  35. #include <sys/socket.h>
  36. #include <netinet/tcp.h>
  37. #include <netinet/in.h>
  38. #include <arpa/inet.h>
  39. #include <stddef.h>
  40. #include <errno.h>
  41. #include <net/if.h>
  42. #endif
  43. #include <limits.h>
  44. #include "jmutex.hpp"
  45. #include "jsocket.hpp"
  46. #include "jexcept.hpp"
  47. #include "jio.hpp"
  48. #include "jmisc.hpp"
  49. #include "jthread.hpp"
  50. #include "jqueue.tpp"
  51. #include "jtime.hpp"
  52. #include "jprop.hpp"
  53. #include "jregexp.hpp"
  54. #include "jdebug.hpp"
  55. #include "build-config.h"
  56. // epoll only with linux
  57. #ifndef __linux__
  58. # undef _HAS_EPOLL_SUPPORT
  59. #else
  60. # define _HAS_EPOLL_SUPPORT
  61. # ifdef _HAS_EPOLL_SUPPORT
  62. # include <unistd.h>
  63. # include <sys/epoll.h>
  64. //# define EPOLLTRACE
  65. # endif
  66. #endif
  67. // various options
  68. #define CONNECT_TIMEOUT_REFUSED_WAIT 1000 // maximum to sleep on connect_timeout
  69. #define TRACE_SLOW_BLOCK_TRANSFER
  70. #define DEFAULT_CONNECT_TIME (100*1000) // for connect_wait
  71. #ifndef _WIN32
  72. #define BLOCK_POLLED_SINGLE_CONNECTS // NB this is much slower in windows
  73. #define CENTRAL_NODE_RANDOM_DELAY
  74. #else
  75. #define USERECVSEM // to singlethread BF_SYNC_TRANSFER_PUSH
  76. #endif
  77. #ifdef _DEBUG
  78. //#define SOCKTRACE
  79. //#define EPOLLTRACE
  80. #endif
  81. #ifdef _TESTING
  82. #define _TRACE
  83. #endif
  84. #ifdef _TRACE
  85. #define THROWJSOCKEXCEPTION(exc) \
  86. { StringBuffer msg; \
  87. msg.appendf("Target: %s, Raised in: %s, line %d",tracename ,__FILE__, __LINE__); \
  88. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  89. throw e; }
  90. #define THROWJSOCKEXCEPTION2(exc) \
  91. { StringBuffer msg; \
  92. msg.appendf("Raised in: %s, line %d",__FILE__, __LINE__); \
  93. IJSOCK_Exception *e = new SocketException(exc,msg.str());\
  94. throw e; }
  95. #define LOGERR(err,ref,info) LogErr(err,ref,info,__LINE__,NULL)
  96. #define LOGERR2(err,ref,info) LogErr(err,ref,info,__LINE__,tracename)
  97. #else
  98. #define THROWJSOCKEXCEPTION(exc) \
  99. { IJSOCK_Exception *e = new SocketException(exc);\
  100. throw e; }
  101. #define THROWJSOCKEXCEPTION2(exc) THROWJSOCKEXCEPTION(exc)
  102. #define LOGERR(err,ref,info)
  103. #define LOGERR2(err,ref,info)
  104. #endif
  105. JSocketStatistics STATS;
  106. static bool IP4only=false; // slighly faster if we know no IPv6
  107. static bool IP6preferred=false; // e.g. for DNS and socket create
  108. IpSubNet PreferredSubnet(NULL,NULL); // set this if you prefer a particular subnet for debugging etc
  109. // e.g. PreferredSubnet("192.168.16.0", "255.255.255.0")
  110. static atomic_t pre_conn_unreach_cnt = ATOMIC_INIT(0); // global count of pre_connect() ENETUNREACH error
  111. #define IPV6_SERIALIZE_PREFIX (0x00ff00ff)
  112. inline void LogErr(unsigned err,unsigned ref,const char *info,unsigned lineno,const char *tracename)
  113. {
  114. if (err)
  115. PROGLOG("jsocket(%d,%d)%s%s err = %d%s%s",ref,lineno,
  116. (info&&*info)?" ":"",(info&&*info)?info:"",err,
  117. (tracename&&*tracename)?" : ":"",(tracename&&*tracename)?tracename:"");
  118. }
  119. class jlib_thrown_decl SocketException: public CInterface, public IJSOCK_Exception
  120. {
  121. public:
  122. IMPLEMENT_IINTERFACE;
  123. SocketException(int code,const char *_msg=NULL) : errcode(code)
  124. {
  125. if (_msg)
  126. msg = strdup(_msg);
  127. else
  128. msg = NULL;
  129. };
  130. ~SocketException() { free(msg); }
  131. int errorCode() const { return errcode; }
  132. static StringBuffer & geterrormessage(int err,StringBuffer &str)
  133. {
  134. switch (err) {
  135. case JSOCKERR_ok: return str.append("ok");
  136. case JSOCKERR_not_opened: return str.append("socket not opened");
  137. case JSOCKERR_bad_address: return str.append("bad address");
  138. case JSOCKERR_connection_failed: return str.append("connection failed");
  139. case JSOCKERR_broken_pipe: return str.append("connection is broken");
  140. case JSOCKERR_graceful_close: return str.append("connection closed other end");
  141. case JSOCKERR_invalid_access_mode: return str.append("invalid access mode");
  142. case JSOCKERR_timeout_expired: return str.append("timeout expired");
  143. case JSOCKERR_port_in_use: return str.append("port in use");
  144. case JSOCKERR_cancel_accept: return str.append("cancel accept");
  145. case JSOCKERR_connectionless_socket: return str.append("connectionless socket");
  146. case JSOCKERR_handle_too_large: return str.append("handle too large");
  147. case JSOCKERR_bad_netaddr: return str.append("bad net addr");
  148. case JSOCKERR_ipv6_not_implemented: return str.append("IPv6 not implemented");
  149. // OS errors
  150. #ifdef _WIN32
  151. case WSAEINTR: return str.append("WSAEINTR(10004) - Interrupted system call.");
  152. case WSAEBADF: return str.append("WSAEBADF(10009) - Bad file number.");
  153. case WSAEACCES: return str.append("WSAEACCES(10013) - Permission denied.");
  154. case WSAEFAULT: return str.append("WSAEFAULT(10014) - Bad address.");
  155. case WSAEINVAL: return str.append("WSAEINVAL(10022) - Invalid argument.");
  156. case WSAEMFILE: return str.append("WSAEMFILE(10024) - Too many open files.");
  157. case WSAEWOULDBLOCK: return str.append("WSAEWOULDBLOCK(10035) - Operation would block.");
  158. case WSAEINPROGRESS: return str.append("WSAEINPROGRESS(10036) - Operation now in progress.");
  159. case WSAEALREADY: return str.append("WSAEALREADY(10037) - Operation already in progress.");
  160. case WSAENOTSOCK: return str.append("WSAENOTSOCK(10038) - Socket operation on nonsocket.");
  161. case WSAEDESTADDRREQ: return str.append("WSAEDESTADDRREQ(10039) - Destination address required.");
  162. case WSAEMSGSIZE: return str.append("WSAEMSGSIZE(10040) - Message too long.");
  163. case WSAEPROTOTYPE: return str.append("WSAEPROTOTYPE(10041) - Protocol wrong type for socket.");
  164. case WSAENOPROTOOPT: return str.append("WSAENOPROTOOPT(10042) - Protocol not available.");
  165. case WSAEPROTONOSUPPORT: return str.append("WSAEPROTONOSUPPORT(10043) - Protocol not supported.");
  166. case WSAESOCKTNOSUPPORT: return str.append("WSAESOCKTNOSUPPORT(10044) - Socket type not supported.");
  167. case WSAEOPNOTSUPP: return str.append("WSAEOPNOTSUPP(10045) - Operation not supported on socket.");
  168. case WSAEPFNOSUPPORT: return str.append("WSAEPFNOSUPPORT(10046) - Protocol family not supported.");
  169. case WSAEAFNOSUPPORT: return str.append("WSAEAFNOSUPPORT(10047) - Address family not supported by protocol family.");
  170. case WSAEADDRINUSE: return str.append("WSAEADDRINUSE(10048) - Address already in use.");
  171. case WSAEADDRNOTAVAIL: return str.append("WSAEADDRNOTAVAIL(10049) - Cannot assign requested address.");
  172. case WSAENETDOWN: return str.append("WSAENETDOWN(10050) - Network is down.");
  173. case WSAENETUNREACH: return str.append("WSAENETUNREACH(10051) - Network is unreachable.");
  174. case WSAENETRESET: return str.append("WSAENETRESET(10052) - Network dropped connection on reset.");
  175. case WSAECONNABORTED: return str.append("WSAECONNABORTED(10053) - Software caused connection abort.");
  176. case WSAECONNRESET: return str.append("WSAECONNRESET(10054) - Connection reset by peer.");
  177. case WSAENOBUFS: return str.append("WSAENOBUFS(10055) - No buffer space available.");
  178. case WSAEISCONN: return str.append("WSAEISCONN(10056) - Socket is already connected.");
  179. case WSAENOTCONN: return str.append("WSAENOTCONN(10057) - Socket is not connected.");
  180. case WSAESHUTDOWN: return str.append("WSAESHUTDOWN(10058) - Cannot send after socket shutdown.");
  181. case WSAETOOMANYREFS: return str.append("WSAETOOMANYREFS(10059) - Too many references: cannot splice.");
  182. case WSAETIMEDOUT: return str.append("WSAETIMEDOUT(10060) - Connection timed out.");
  183. case WSAECONNREFUSED: return str.append("WSAECONNREFUSED(10061) - Connection refused.");
  184. case WSAELOOP: return str.append("WSAELOOP(10062) - Too many levels of symbolic links.");
  185. case WSAENAMETOOLONG: return str.append("WSAENAMETOOLONG(10063) - File name too long.");
  186. case WSAEHOSTDOWN: return str.append("WSAEHOSTDOWN(10064) - Host is down.");
  187. case WSAEHOSTUNREACH: return str.append("WSAEHOSTUNREACH(10065) - No route to host.");
  188. case WSASYSNOTREADY: return str.append("WSASYSNOTREADY(10091) - The network subsystem is unusable.");
  189. case WSAVERNOTSUPPORTED: return str.append("WSAVERNOTSUPPORTED(10092) - The Windows Sockets DLL cannot support this application.");
  190. case WSANOTINITIALISED: return str.append("WSANOTINITIALISED(10093) - Winsock not initialized.");
  191. case WSAEDISCON: return str.append("WSAEDISCON(10101) - Disconnect.");
  192. case WSAHOST_NOT_FOUND: return str.append("WSAHOST_NOT_FOUND(11001) - Host not found.");
  193. case WSATRY_AGAIN: return str.append("WSATRY_AGAIN(11002) - Nonauthoritative host not found.");
  194. case WSANO_RECOVERY: return str.append("WSANO_RECOVERY(11003) - Nonrecoverable error.");
  195. case WSANO_DATA: return str.append("WSANO_DATA(11004) - Valid name, no data record of requested type.");
  196. #else
  197. case ENOTSOCK: return str.append("ENOTSOCK - Socket operation on non-socket ");
  198. case EDESTADDRREQ: return str.append("EDESTADDRREQ - Destination address required ");
  199. case EMSGSIZE: return str.append("EMSGSIZE - Message too long ");
  200. case EPROTOTYPE: return str.append("EPROTOTYPE - Protocol wrong type for socket ");
  201. case ENOPROTOOPT: return str.append("ENOPROTOOPT - Protocol not available ");
  202. case EPROTONOSUPPORT: return str.append("EPROTONOSUPPORT - Protocol not supported ");
  203. case ESOCKTNOSUPPORT: return str.append("ESOCKTNOSUPPORT - Socket type not supported ");
  204. case EOPNOTSUPP: return str.append("EOPNOTSUPP - Operation not supported on socket ");
  205. case EPFNOSUPPORT: return str.append("EPFNOSUPPORT - Protocol family not supported ");
  206. case EAFNOSUPPORT: return str.append("EAFNOSUPPORT - Address family not supported by protocol family ");
  207. case EADDRINUSE: return str.append("EADDRINUSE - Address already in use ");
  208. case EADDRNOTAVAIL: return str.append("EADDRNOTAVAIL - Can't assign requested address ");
  209. case ENETDOWN: return str.append("ENETDOWN - Network is down ");
  210. case ENETUNREACH: return str.append("ENETUNREACH - Network is unreachable ");
  211. case ENETRESET: return str.append("ENETRESET - Network dropped connection because of reset ");
  212. case ECONNABORTED: return str.append("ECONNABORTED - Software caused connection abort ");
  213. case ECONNRESET: return str.append("ECONNRESET - Connection reset by peer ");
  214. case ENOBUFS: return str.append("ENOBUFS - No buffer space available ");
  215. case EISCONN: return str.append("EISCONN - Socket is already connected ");
  216. case ENOTCONN: return str.append("ENOTCONN - Socket is not connected ");
  217. case ESHUTDOWN: return str.append("ESHUTDOWN - Can't send after socket shutdown ");
  218. case ETOOMANYREFS: return str.append("ETOOMANYREFS - Too many references: can't splice ");
  219. case ETIMEDOUT: return str.append("ETIMEDOUT - Connection timed out ");
  220. case ECONNREFUSED: return str.append("ECONNREFUSED - Connection refused ");
  221. case EHOSTDOWN: return str.append("EHOSTDOWN - Host is down ");
  222. case EHOSTUNREACH: return str.append("EHOSTUNREACH - No route to host ");
  223. case EWOULDBLOCK: return str.append("EWOULDBLOCK - operation already in progress");
  224. case EINPROGRESS: return str.append("EINPROGRESS - operation now in progress ");
  225. #endif
  226. }
  227. IException *ose = MakeOsException(err);
  228. ose->errorMessage(str);
  229. ose->Release();
  230. return str;
  231. }
  232. StringBuffer & errorMessage(StringBuffer &str) const
  233. {
  234. if (msg)
  235. return geterrormessage(errcode,str).append('\n').append(msg);
  236. return geterrormessage(errcode,str);
  237. }
  238. MessageAudience errorAudience() const
  239. {
  240. switch (errcode) {
  241. case JSOCKERR_port_in_use:
  242. return MSGAUD_operator;
  243. }
  244. return MSGAUD_user;
  245. }
  246. private:
  247. int errcode;
  248. char *msg;
  249. };
  250. IJSOCK_Exception *IPv6NotImplementedException(const char *filename,unsigned lineno)
  251. {
  252. StringBuffer msg;
  253. msg.appendf("%s(%d)",filename,lineno);
  254. return new SocketException(JSOCKERR_ipv6_not_implemented,msg.str());
  255. }
  256. struct MCASTREQ
  257. {
  258. struct in_addr imr_multiaddr; /* multicast group to join */
  259. struct in_addr imr_interface; /* interface to join on */
  260. MCASTREQ(const char *mcip)
  261. {
  262. imr_multiaddr.s_addr = inet_addr(mcip);
  263. imr_interface.s_addr = htonl(INADDR_ANY);
  264. }
  265. };
  266. #ifdef __APPLE__
  267. #ifndef MSG_NOSIGNAL
  268. #define MSG_NOSIGNAL 0x4000
  269. #endif
  270. #endif
  271. #if defined( _WIN32)
  272. #define T_SOCKET SOCKET
  273. #define T_FD_SET fd_set
  274. #define XFD_SETSIZE FD_SETSIZE
  275. #define ETIMEDOUT WSAETIMEDOUT
  276. #define ECONNREFUSED WSAECONNREFUSED
  277. #define XFD_ZERO(s) FD_ZERO(s)
  278. #define SEND_FLAGS 0
  279. #define BADSOCKERR(err) ((err==WSAEBADF)||(err==WSAENOTSOCK))
  280. #define CHECKSOCKRANGE(s)
  281. #elif defined(__FreeBSD__) || defined(__APPLE__)
  282. #define XFD_SETSIZE FD_SETSIZE
  283. #define T_FD_SET fd_set
  284. #define XFD_ZERO(s) FD_ZERO(s)
  285. #define T_SOCKET int
  286. #define SEND_FLAGS (MSG_NOSIGNAL)
  287. #define BADSOCKERR(err) ((err==EBADF)||(err==ENOTSOCK))
  288. #define CHECKSOCKRANGE(s)
  289. #else
  290. #define XFD_SETSIZE 32768
  291. struct xfd_set { __fd_mask fds_bits[XFD_SETSIZE / __NFDBITS]; }; // define our own
  292. // linux 64 bit
  293. #ifdef __linux__
  294. #ifdef __x86_64__
  295. #undef __FDMASK
  296. #define __FDMASK(d) (1UL << ((d) % __NFDBITS))
  297. #undef __FDELT
  298. #define __FDELT(d) ((d) / __NFDBITS)
  299. #undef __FD_SET
  300. #define __FD_SET(d, s) (__FDS_BITS (s)[__FDELT(d)] |= __FDMASK(d))
  301. #undef __FD_ISSET
  302. #define __FD_ISSET(d, s) ((__FDS_BITS (s)[__FDELT(d)] & __FDMASK(d)) != 0)
  303. #endif
  304. #define CHECKSOCKRANGE(s) { if (s>=XFD_SETSIZE) THROWJSOCKEXCEPTION2(JSOCKERR_handle_too_large); }
  305. #endif
  306. // end 64 bit
  307. #define T_FD_SET xfd_set
  308. #define XFD_ZERO(s) memset(s,0,sizeof(xfd_set))
  309. #define T_SOCKET int
  310. #define SEND_FLAGS (MSG_NOSIGNAL)
  311. #define BADSOCKERR(err) ((err==EBADF)||(err==ENOTSOCK))
  312. #endif
  313. #ifdef CENTRAL_NODE_RANDOM_DELAY
  314. static SocketEndpointArray CentralNodeArray;
  315. #endif
  316. enum SOCKETMODE { sm_tcp_server, sm_tcp, sm_udp_server, sm_udp, sm_multicast_server, sm_multicast};
  317. class CSocket: public CInterface, public ISocket
  318. {
  319. public:
  320. IMPLEMENT_IINTERFACE;
  321. static CriticalSection crit;
  322. protected:
  323. friend class CSocketConnectWait;
  324. enum { ss_open, ss_shutdown, ss_close, ss_pre_open } state;
  325. T_SOCKET sock;
  326. char* hostname; // host address
  327. unsigned short hostport; // host port
  328. SOCKETMODE sockmode;
  329. IpAddress targetip;
  330. SocketEndpoint returnep; // set by set_return_addr
  331. MCASTREQ * mcastreq;
  332. size32_t nextblocksize;
  333. unsigned blockflags;
  334. unsigned blocktimeoutms;
  335. bool owned;
  336. enum {accept_not_cancelled, accept_cancel_pending, accept_cancelled} accept_cancel_state;
  337. bool in_accept;
  338. bool nonblocking;
  339. bool nagling;
  340. static unsigned connectingcount;
  341. #ifdef USERECVSEM
  342. static Semaphore receiveblocksem;
  343. bool receiveblocksemowned; // owned by this socket
  344. #endif
  345. #ifdef _TRACE
  346. char * tracename;
  347. #endif
  348. public:
  349. void open(int listen_queue_size,bool reuseports=false);
  350. bool connect_timeout( unsigned timeout, bool noexception);
  351. void connect_wait( unsigned timems);
  352. void udpconnect();
  353. void read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,unsigned timeoutsecs);
  354. void readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read, unsigned timedelaysecs);
  355. void read(void* buf, size32_t size);
  356. size32_t write(void const* buf, size32_t size);
  357. size32_t write_multiple(unsigned num,void const**buf, size32_t *size);
  358. size32_t udp_write_to(SocketEndpoint &ep,void const* buf, size32_t size);
  359. void close();
  360. void errclose();
  361. bool connectionless() { return (sockmode!=sm_tcp)&&(sockmode!=sm_tcp_server); }
  362. void shutdown(unsigned mode);
  363. ISocket* accept(bool allowcancel);
  364. int wait_read(unsigned timeout);
  365. int wait_write(unsigned timeout);
  366. int name(char *name,size32_t namemax);
  367. int peer_name(char *name,size32_t namemax);
  368. SocketEndpoint &getPeerEndpoint(SocketEndpoint &ep);
  369. IpAddress & getPeerAddress(IpAddress &addr);
  370. void set_return_addr(int port,const char *name); // sets returnep
  371. void cancel_accept();
  372. size32_t get_max_send_size();
  373. bool set_nonblock(bool on=true);
  374. bool set_nagle(bool on);
  375. void set_linger(int lingersecs);
  376. void set_keep_alive(bool set);
  377. virtual void set_inherit(bool inherit=false);
  378. virtual bool check_connection();
  379. // Block functions
  380. void set_block_mode(unsigned flags,size32_t recsize=0,unsigned timeoutms=0);
  381. bool send_block(const void *blk,size32_t sz);
  382. size32_t receive_block_size();
  383. size32_t receive_block(void *blk,size32_t sz);
  384. size32_t get_send_buffer_size();
  385. void set_send_buffer_size(size32_t sz);
  386. bool join_multicast_group(SocketEndpoint &ep); // for udp multicast
  387. bool leave_multicast_group(SocketEndpoint &ep); // for udp multicast
  388. size32_t get_receive_buffer_size();
  389. void set_receive_buffer_size(size32_t sz);
  390. size32_t avail_read();
  391. int pre_connect(bool block);
  392. int post_connect();
  393. CSocket(const SocketEndpoint &_ep,SOCKETMODE smode,const char *name);
  394. CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned);
  395. virtual ~CSocket();
  396. unsigned OShandle()
  397. {
  398. return (unsigned)sock;
  399. }
  400. private:
  401. int closesock()
  402. {
  403. if (sock!=INVALID_SOCKET) {
  404. T_SOCKET s = sock;
  405. sock = INVALID_SOCKET;
  406. STATS.activesockets--;
  407. #ifdef SOCKTRACE
  408. PROGLOG("SOCKTRACE: Closing socket %x %d (%x)", s, s, this);
  409. #endif
  410. #ifdef _WIN32
  411. return ::closesocket(s);
  412. #else
  413. return ::close(s);
  414. #endif
  415. }
  416. else
  417. return 0;
  418. }
  419. };
  420. CriticalSection CSocket::crit;
  421. unsigned CSocket::connectingcount=0;
  422. #ifdef USERECVSEM
  423. Semaphore CSocket::receiveblocksem(2);
  424. #endif
  425. #ifdef _WIN32
  426. class win_socket_library
  427. {
  428. static bool initdone; // to prevent dependancy probs very early on (e.g. jlog)
  429. public:
  430. win_socket_library() { init(); }
  431. bool init()
  432. {
  433. if (initdone)
  434. return true;
  435. WSADATA wsa;
  436. if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
  437. if (WSAStartup(MAKEWORD(1, 1), &wsa) != 0) {
  438. MessageBox(NULL,"Failed to initialize windows sockets","JLib Socket Error",MB_OK);
  439. return false;
  440. }
  441. }
  442. initdone = true;
  443. return true;
  444. }
  445. ~win_socket_library()
  446. {
  447. WSACleanup();
  448. }
  449. };
  450. bool win_socket_library::initdone = false;
  451. static win_socket_library ws32_lib;
  452. #define ERRNO() WSAGetLastError()
  453. #define EADDRINUSE WSAEADDRINUSE
  454. #define EINTRCALL WSAEINTR
  455. #define ECONNRESET WSAECONNRESET
  456. #define ECONNABORTED WSAECONNABORTED
  457. #define ENOTCONN WSAENOTCONN
  458. #define EWOULDBLOCK WSAEWOULDBLOCK
  459. #define EINPROGRESS WSAEINPROGRESS
  460. #define ENETUNREACH WSAENETUNREACH
  461. #define ENOTSOCK WSAENOTSOCK
  462. struct j_sockaddr_in6 {
  463. short sin6_family; /* AF_INET6 */
  464. u_short sin6_port; /* Transport level port number */
  465. u_long sin6_flowinfo; /* IPv6 flow information */
  466. struct in_addr6 sin6_addr; /* IPv6 address */
  467. u_long sin6_scope_id; /* set of interfaces for a scope */
  468. };
  469. typedef union {
  470. struct sockaddr sa;
  471. struct j_sockaddr_in6 sin6;
  472. struct sockaddr_in sin;
  473. } J_SOCKADDR;
  474. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  475. static int _inet_pton(int af, const char* src, void* dst)
  476. {
  477. DEFINE_SOCKADDR(u);
  478. int address_length;
  479. switch (af) {
  480. case AF_INET:
  481. u.sin.sin_family = AF_INET;
  482. address_length = sizeof (u.sin);
  483. break;
  484. case AF_INET6:
  485. u.sin6.sin6_family = AF_INET6;
  486. address_length = sizeof (u.sin6);
  487. break;
  488. default:
  489. #ifdef EAFNOSUPPORT
  490. errno = EAFNOSUPPORT;
  491. #else
  492. errno = 52;
  493. #endif
  494. return -1;
  495. }
  496. ws32_lib.init();
  497. int ret = WSAStringToAddress ((LPTSTR) src, af, NULL, &u.sa, &address_length);
  498. if (ret == 0) {
  499. switch (af) {
  500. case AF_INET:
  501. memcpy (dst, &u.sin.sin_addr, sizeof (struct in_addr));
  502. break;
  503. case AF_INET6:
  504. memcpy (dst, &u.sin6.sin6_addr, sizeof (u.sin6.sin6_addr));
  505. break;
  506. }
  507. return 1;
  508. }
  509. errno = WSAGetLastError();
  510. // PROGLOG("errno = %d",errno);
  511. return 0;
  512. }
  513. static const char * _inet_ntop (int af, const void *src, char *dst, socklen_t cnt)
  514. {
  515. /* struct sockaddr can't accomodate struct sockaddr_in6. */
  516. DEFINE_SOCKADDR(u);
  517. DWORD dstlen = cnt;
  518. size_t srcsize;
  519. memset(&u,0,sizeof(u));
  520. switch (af) {
  521. case AF_INET:
  522. u.sin.sin_family = AF_INET;
  523. u.sin.sin_addr = *(struct in_addr *) src;
  524. srcsize = sizeof (u.sin);
  525. break;
  526. case AF_INET6:
  527. u.sin6.sin6_family = AF_INET6;
  528. memcpy(&u.sin6.sin6_addr,src,sizeof(in_addr6));
  529. srcsize = sizeof (u.sin6);
  530. break;
  531. default:
  532. return NULL;
  533. }
  534. ws32_lib.init();
  535. if (WSAAddressToString (&u.sa, srcsize, NULL, dst, &dstlen) != 0) {
  536. errno = WSAGetLastError();
  537. return NULL;
  538. }
  539. return (const char *) dst;
  540. }
  541. int inet_aton (const char *name, struct in_addr *addr)
  542. {
  543. addr->s_addr = inet_addr (name);
  544. return (addr->s_addr == (u_long)-1)?1:0; // 255.255.255.255 has had it here
  545. }
  546. #else
  547. #define _inet_ntop inet_ntop
  548. #define _inet_pton inet_pton
  549. #define in_addr6 in6_addr
  550. typedef union {
  551. struct sockaddr sa;
  552. struct sockaddr_in6 sin6;
  553. struct sockaddr_in sin;
  554. } J_SOCKADDR;
  555. #define DEFINE_SOCKADDR(name) J_SOCKADDR name; memset(&name,0,sizeof(J_SOCKADDR))
  556. #define EINTRCALL EINTR
  557. #define ERRNO() (errno)
  558. #ifndef INADDR_NONE
  559. #define INADDR_NONE (-1)
  560. #endif
  561. #endif
  562. #ifndef INET6_ADDRSTRLEN
  563. #define INET6_ADDRSTRLEN 65
  564. #endif
  565. inline socklen_t setSockAddr(J_SOCKADDR &u, const IpAddress &ip,unsigned short port)
  566. {
  567. if (!IP6preferred) {
  568. if (ip.getNetAddress(sizeof(in_addr),&u.sin.sin_addr)==sizeof(in_addr)) {
  569. u.sin.sin_family = AF_INET;
  570. u.sin.sin_port = htons(port);
  571. return sizeof(u.sin);
  572. }
  573. }
  574. if (IP4only)
  575. IPV6_NOT_IMPLEMENTED();
  576. ip.getNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  577. u.sin6.sin6_family = AF_INET6;
  578. u.sin6.sin6_port = htons(port);
  579. return sizeof(u.sin6);
  580. }
  581. inline socklen_t setSockAddrAny(J_SOCKADDR &u, unsigned short port)
  582. {
  583. if (IP6preferred) {
  584. #ifdef _WIN32
  585. IN6ADDR_SETANY((PSOCKADDR_IN6)&u.sin6.sin6_addr);
  586. #else
  587. memcpy(&u.sin6.sin6_addr,&in6addr_any,sizeof(in_addr6));
  588. #endif
  589. u.sin6.sin6_family= AF_INET6;
  590. u.sin6.sin6_port = htons(port);
  591. return sizeof(u.sin6);
  592. }
  593. u.sin.sin_addr.s_addr = htonl(INADDR_ANY);
  594. u.sin.sin_family= AF_INET;
  595. u.sin.sin_port = htons(port);
  596. return sizeof(u.sin);
  597. }
  598. inline void getSockAddrEndpoint(const J_SOCKADDR &u, socklen_t ul, SocketEndpoint &ep)
  599. {
  600. if (ul==sizeof(u.sin)) {
  601. ep.setNetAddress(sizeof(in_addr),&u.sin.sin_addr);
  602. ep.port = htons(u.sin.sin_port);
  603. }
  604. else {
  605. ep.setNetAddress(sizeof(in_addr6),&u.sin6.sin6_addr);
  606. ep.port = htons(u.sin6.sin6_port);
  607. }
  608. }
  609. /* might need fcntl(F_SETFL), or ioctl(FIONBIO) */
  610. /* Posix.1g says fcntl */
  611. #if defined(O_NONBLOCK)
  612. bool CSocket::set_nonblock(bool on)
  613. {
  614. int flags = fcntl(sock, F_GETFL, 0);
  615. if (flags == -1)
  616. return nonblocking;
  617. if (on)
  618. flags |= O_NONBLOCK;
  619. else
  620. flags &= ~O_NONBLOCK;
  621. if (fcntl(sock, F_SETFL, flags)==0) {
  622. bool wasNonBlocking = nonblocking;
  623. nonblocking = on;
  624. return wasNonBlocking;
  625. }
  626. return nonblocking;
  627. }
  628. #else
  629. bool CSocket::set_nonblock(bool on)
  630. {
  631. #ifdef _WIN32
  632. u_long yes = on?1:0;
  633. if (ioctlsocket(sock, FIONBIO, &yes)==0) {
  634. #else
  635. int yes = on?1:0;
  636. if (ioctl(sock, FIONBIO, &yes)==0) {
  637. #endif
  638. bool wasNonBlocking = nonblocking;
  639. nonblocking = on;
  640. return wasNonBlocking;
  641. }
  642. return nonblocking;
  643. }
  644. #endif
  645. bool CSocket::set_nagle(bool on)
  646. {
  647. bool ret = nagling;
  648. nagling = on;
  649. int enabled = !on;
  650. if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled)) != 0) {
  651. nagling = !on;
  652. }
  653. return ret;
  654. }
  655. void CSocket::set_inherit(bool inherit)
  656. {
  657. #ifndef _WIN32
  658. long flag = fcntl(sock, F_GETFD);
  659. if(inherit)
  660. flag &= ~FD_CLOEXEC;
  661. else
  662. flag |= FD_CLOEXEC;
  663. fcntl(sock, F_SETFD, flag);
  664. #endif
  665. }
  666. size32_t CSocket::avail_read()
  667. {
  668. #ifdef _WIN32
  669. u_long avail;
  670. if (ioctlsocket(sock, FIONREAD, &avail)==0)
  671. #else
  672. int avail;
  673. if (ioctl(sock, FIONREAD, &avail)==0)
  674. #endif
  675. return (size32_t)avail;
  676. int err = ERRNO();
  677. LOGERR2(err,1,"avail_read");
  678. return 0;
  679. }
  680. #define PRE_CONN_UNREACH_ELIM 100
  681. int CSocket::pre_connect (bool block)
  682. {
  683. assertex(hostname);
  684. DEFINE_SOCKADDR(u);
  685. if (targetip.isNull()) {
  686. set_return_addr(hostport,hostname);
  687. targetip.ipset(returnep);
  688. }
  689. socklen_t ul = setSockAddr(u,targetip,hostport);
  690. sock = ::socket(u.sa.sa_family, SOCK_STREAM, targetip.isIp4()?0:PF_INET6);
  691. owned = true;
  692. state = ss_pre_open; // will be set to open by post_connect
  693. if (sock == INVALID_SOCKET) {
  694. int err = ERRNO();
  695. THROWJSOCKEXCEPTION(err);
  696. }
  697. STATS.activesockets++;
  698. int err = 0;
  699. set_nonblock(!block);
  700. int rc = ::connect(sock, &u.sa, ul);
  701. if (rc==SOCKET_ERROR) {
  702. err = ERRNO();
  703. if ((err != EINPROGRESS)&&(err != EWOULDBLOCK)&&(err != ETIMEDOUT)&&(err!=ECONNREFUSED)) { // handled by caller
  704. if (err != ENETUNREACH) {
  705. atomic_set(&pre_conn_unreach_cnt, 0);
  706. LOGERR2(err,1,"pre_connect");
  707. } else {
  708. int ecnt = atomic_read(&pre_conn_unreach_cnt);
  709. if (ecnt <= PRE_CONN_UNREACH_ELIM) {
  710. atomic_inc(&pre_conn_unreach_cnt);
  711. LOGERR2(err,1,"pre_connect network unreachable");
  712. }
  713. }
  714. } else
  715. atomic_set(&pre_conn_unreach_cnt, 0);
  716. } else
  717. atomic_set(&pre_conn_unreach_cnt, 0);
  718. #ifdef SOCKTRACE
  719. PROGLOG("SOCKTRACE: pre-connected socket%s %x %d (%x) err=%d", block?"(block)":"", sock, sock, (int)this, err);
  720. #endif
  721. return err;
  722. }
  723. int CSocket::post_connect ()
  724. {
  725. set_nonblock(false);
  726. int err = 0;
  727. socklen_t errlen = sizeof(err);
  728. int rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  729. if ((rc!=0)&&!err)
  730. err = ERRNO(); // some implementations of getsockopt duff
  731. if (err==0) {
  732. nagling = true;
  733. set_nagle(false);
  734. state = ss_open;
  735. }
  736. else if ((err!=ETIMEDOUT)&&(err!=ECONNREFUSED)) // handled by caller
  737. LOGERR2(err,1,"post_connect");
  738. return err;
  739. }
  740. void CSocket::open(int listen_queue_size,bool reuseports)
  741. {
  742. if (IP6preferred)
  743. sock = ::socket(AF_INET6, connectionless()?SOCK_DGRAM:SOCK_STREAM, PF_INET6);
  744. else
  745. sock = ::socket(AF_INET, connectionless()?SOCK_DGRAM:SOCK_STREAM, 0);
  746. if (sock == INVALID_SOCKET) {
  747. THROWJSOCKEXCEPTION(ERRNO());
  748. }
  749. STATS.activesockets++;
  750. #ifdef SOCKTRACE
  751. PROGLOG("SOCKTRACE: opened socket %x %d (%x)", sock,sock,this);
  752. #endif
  753. if ((hostport==0)&&(sockmode==sm_udp)) {
  754. state = ss_open;
  755. #ifdef SOCKTRACE
  756. PROGLOG("SOCKTRACE: opened socket return udp");
  757. #endif
  758. set_inherit(false);
  759. return;
  760. }
  761. #ifndef _WIN32
  762. reuseports = true; // for some reason linux requires reuse ports
  763. #endif
  764. if (reuseports) {
  765. int on = 1;
  766. setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *)&on, sizeof(on));
  767. }
  768. DEFINE_SOCKADDR(u);
  769. socklen_t ul;
  770. if (hostname) {
  771. if (targetip.isNull()) {
  772. set_return_addr(hostport,hostname);
  773. targetip.ipset(returnep);
  774. }
  775. ul = setSockAddr(u,targetip,hostport);
  776. }
  777. else
  778. ul = setSockAddrAny(u,hostport);
  779. int saverr;
  780. if (::bind(sock, &u.sa, ul) != 0) {
  781. saverr = ERRNO();
  782. if (saverr==EADDRINUSE) { // don't log as error (some usages probe ports)
  783. ErrPortInUse:
  784. closesock();
  785. char msg[1024];
  786. sprintf(msg,"Target: %s, port = %d, Raised in: %s, line %d",tracename,(int)hostport,__FILE__, __LINE__);
  787. IJSOCK_Exception *e = new SocketException(JSOCKERR_port_in_use,msg);
  788. throw e;
  789. }
  790. else {
  791. closesock();
  792. THROWJSOCKEXCEPTION(saverr);
  793. }
  794. }
  795. if (!connectionless()) {
  796. if (::listen(sock, listen_queue_size) != 0) {
  797. saverr = ERRNO();
  798. if (saverr==EADDRINUSE)
  799. goto ErrPortInUse;
  800. closesock();
  801. THROWJSOCKEXCEPTION(saverr);
  802. }
  803. }
  804. if (mcastreq) {
  805. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)mcastreq, sizeof(*mcastreq))!=0) {
  806. saverr = ERRNO();
  807. closesock();
  808. THROWJSOCKEXCEPTION(saverr);
  809. }
  810. }
  811. state = ss_open;
  812. #ifdef SOCKTRACE
  813. PROGLOG("SOCKTRACE: opened socket return");
  814. #endif
  815. set_inherit(false);
  816. }
  817. ISocket* CSocket::accept(bool allowcancel)
  818. {
  819. if ((accept_cancel_state!=accept_not_cancelled) && allowcancel) {
  820. accept_cancel_state=accept_cancelled;
  821. return NULL;
  822. }
  823. if (state != ss_open) {
  824. ERRLOG("invalid accept, state = %d",(int)state);
  825. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  826. }
  827. if (connectionless()) {
  828. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  829. }
  830. T_SOCKET newsock;
  831. loop {
  832. in_accept = true;
  833. newsock = (sock!=INVALID_SOCKET)?::accept(sock, NULL, NULL):INVALID_SOCKET;
  834. in_accept = false;
  835. #ifdef SOCKTRACE
  836. PROGLOG("SOCKTRACE: accept created socket %x %d (%x)", newsock,newsock,this);
  837. #endif
  838. if (newsock!=INVALID_SOCKET) {
  839. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  840. ::close(newsock);
  841. newsock=INVALID_SOCKET;
  842. }
  843. else {
  844. accept_cancel_state = accept_not_cancelled;
  845. break;
  846. }
  847. }
  848. int saverr;
  849. saverr = ERRNO();
  850. if ((sock==INVALID_SOCKET)||(accept_cancel_state==accept_cancel_pending)) {
  851. accept_cancel_state = accept_cancelled;
  852. if (allowcancel)
  853. return NULL;
  854. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  855. }
  856. if (saverr != EINTRCALL) {
  857. accept_cancel_state = accept_not_cancelled;
  858. THROWJSOCKEXCEPTION(saverr);
  859. }
  860. }
  861. if (state != ss_open) {
  862. accept_cancel_state = accept_cancelled;
  863. if (allowcancel)
  864. return NULL;
  865. THROWJSOCKEXCEPTION(JSOCKERR_cancel_accept);
  866. }
  867. CSocket *ret = new CSocket(newsock,sm_tcp,true);
  868. ret->set_inherit(false);
  869. return ret;
  870. }
  871. void CSocket::set_linger(int lingertime)
  872. {
  873. struct linger l;
  874. l.l_onoff = (lingertime>=0)?1:0;
  875. l.l_linger = (lingertime>=0)?lingertime:0;
  876. if (setsockopt(sock, SOL_SOCKET, SO_LINGER, (char*)&l, sizeof(l)) != 0) {
  877. WARNLOG("Linger not set");
  878. }
  879. }
  880. void CSocket::set_keep_alive(bool set)
  881. {
  882. int on=set?1:0;
  883. if (setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&on, sizeof(on)) != 0) {
  884. WARNLOG("KeepAlive not set");
  885. }
  886. }
  887. int CSocket::name(char *retname,size32_t namemax)
  888. {
  889. if (!retname)
  890. namemax = 0;
  891. if (namemax)
  892. retname[0] = 0;
  893. retname[0] = 0;
  894. if (state != ss_open) {
  895. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  896. }
  897. DEFINE_SOCKADDR(u);
  898. socklen_t ul = sizeof(u);
  899. if (::getsockname(sock,&u.sa, &ul)<0) {
  900. THROWJSOCKEXCEPTION(ERRNO());
  901. }
  902. SocketEndpoint ep;
  903. getSockAddrEndpoint(u,ul,ep);
  904. StringBuffer s;
  905. ep.getIpText(s);
  906. if (namemax>=1) {
  907. if (namemax-1<s.length())
  908. s.setLength(namemax-1);
  909. memcpy(retname,s.str(),s.length());
  910. }
  911. return ep.port;
  912. }
  913. int CSocket::peer_name(char *retname,size32_t namemax)
  914. {
  915. // should not raise exceptions
  916. int ret = 0;
  917. if (!retname)
  918. namemax = 0;
  919. if (namemax)
  920. retname[0] = 0;
  921. if (state != ss_open) {
  922. return -1; // don't log as used to test socket
  923. }
  924. StringBuffer s;
  925. if (sockmode==sm_udp_server) { // udp server
  926. returnep.getIpText(s);
  927. ret = returnep.port;
  928. }
  929. else {
  930. DEFINE_SOCKADDR(u);
  931. socklen_t ul = sizeof(u);
  932. if (::getpeername(sock,&u.sa, &ul)<0)
  933. return -1; // don't log as used to test socket
  934. SocketEndpoint ep;
  935. getSockAddrEndpoint(u,ul,ep);
  936. ep.getIpText(s);
  937. ret = ep.port;
  938. }
  939. if (namemax>1) {
  940. if (namemax-1<s.length())
  941. s.setLength(namemax-1);
  942. memcpy(retname,s.str(),s.length()+1);
  943. }
  944. return ret;
  945. }
  946. SocketEndpoint &CSocket::getPeerEndpoint(SocketEndpoint &ep)
  947. {
  948. if (state != ss_open) {
  949. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  950. }
  951. StringBuffer s;
  952. if (sockmode==sm_udp_server) { // udp server
  953. ep.set(returnep);
  954. }
  955. else {
  956. DEFINE_SOCKADDR(u);
  957. socklen_t ul = sizeof(u);
  958. if (::getpeername(sock,&u.sa, &ul)<0) {
  959. DBGLOG("getpeername failed %d",ERRNO());
  960. ep.set(NULL, 0);
  961. }
  962. else
  963. getSockAddrEndpoint(u,ul,ep);
  964. }
  965. return ep;
  966. }
  967. IpAddress & CSocket::getPeerAddress(IpAddress &addr)
  968. {
  969. SocketEndpoint ep;
  970. getPeerEndpoint(ep);
  971. addr = ep;
  972. return addr;
  973. }
  974. void CSocket::set_return_addr(int port,const char *retname)
  975. {
  976. if (!returnep.ipset(retname)) {
  977. IJSOCK_Exception *e = new SocketException(JSOCKERR_bad_address); // don't use THROWJSOCKEXCEPTION here
  978. throw e;
  979. }
  980. returnep.port = port;
  981. }
  982. void CSocket::cancel_accept()
  983. {
  984. if (connectionless()) {
  985. THROWJSOCKEXCEPTION(JSOCKERR_connectionless_socket);
  986. }
  987. #ifdef SOCKTRACE
  988. PROGLOG("SOCKTRACE: Cancel accept socket %x %d (%x)", sock, sock, this);
  989. #endif
  990. if (!in_accept) {
  991. accept_cancel_state = accept_cancelled;
  992. errclose();
  993. return;
  994. }
  995. accept_cancel_state = accept_cancel_pending;
  996. errclose(); // this should cause accept to terminate (not supported on all linux though)
  997. #ifdef _WIN32
  998. for (unsigned i=0;i<5;i++) { // windows closes on different lower priority thread
  999. Sleep(i);
  1000. if (accept_cancel_state==accept_cancelled)
  1001. return;
  1002. }
  1003. #else
  1004. Sleep(0);
  1005. if (accept_cancel_state==accept_cancelled)
  1006. return;
  1007. #endif
  1008. // Wakeup listener using a connection
  1009. SocketEndpoint ep(hostport,queryHostIP());
  1010. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1011. try {
  1012. sock->connect_timeout(100,true);
  1013. }
  1014. catch (IJSOCK_Exception *e) {
  1015. EXCLOG(e,"CSocket::cancel_eccept");
  1016. e->Release();
  1017. }
  1018. }
  1019. // ================================================================================
  1020. // connect versions
  1021. ISocket* ISocket::connect( const SocketEndpoint &ep )
  1022. {
  1023. // general connect
  1024. return ISocket::connect_wait(ep,DEFAULT_CONNECT_TIME);
  1025. }
  1026. inline void refused_sleep(CTimeMon &tm, unsigned &refuseddelay)
  1027. {
  1028. unsigned remaining;
  1029. if (!tm.timedout(&remaining)) {
  1030. if (refuseddelay<remaining/4) {
  1031. Sleep(refuseddelay);
  1032. if (refuseddelay<CONNECT_TIMEOUT_REFUSED_WAIT/2)
  1033. refuseddelay *=2;
  1034. else
  1035. refuseddelay = CONNECT_TIMEOUT_REFUSED_WAIT;
  1036. }
  1037. else
  1038. Sleep(remaining/4); // towards end of timeout approach gradually
  1039. }
  1040. }
  1041. bool CSocket::connect_timeout( unsigned timeout, bool noexception)
  1042. {
  1043. // simple connect with timeout (no fancy stuff!)
  1044. unsigned startt = usTick();
  1045. CTimeMon tm(timeout);
  1046. unsigned remaining;
  1047. unsigned refuseddelay = 1;
  1048. int err;
  1049. while (!tm.timedout(&remaining)) {
  1050. err = pre_connect(false);
  1051. if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) {
  1052. T_FD_SET fds;
  1053. struct timeval tv;
  1054. XFD_ZERO(&fds);
  1055. FD_SET((unsigned)sock, &fds);
  1056. T_FD_SET except;
  1057. XFD_ZERO(&except);
  1058. FD_SET((unsigned)sock, &except);
  1059. tv.tv_sec = remaining / 1000;
  1060. tv.tv_usec = (remaining % 1000)*1000;
  1061. CHECKSOCKRANGE(sock);
  1062. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1063. if (rc>0) {
  1064. // select succeeded - return error from socket (0 if connected)
  1065. socklen_t errlen = sizeof(err);
  1066. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1067. if ((rc!=0)&&!err)
  1068. err = ERRNO(); // some implementations of getsockopt duff
  1069. if (err) // probably ECONNREFUSED but treat all errors same
  1070. refused_sleep(tm,refuseddelay);
  1071. }
  1072. else if (rc<0) {
  1073. err = ERRNO();
  1074. LOGERR2(err,2,"::select");
  1075. }
  1076. }
  1077. if (err==0) {
  1078. err = post_connect();
  1079. if (err==0) {
  1080. STATS.connects++;
  1081. STATS.connecttime+=usTick()-startt;
  1082. #ifdef _TRACE
  1083. char peer[256];
  1084. peer[0] = 'C';
  1085. peer[1] = '!';
  1086. strcpy(peer+2,hostname?hostname:"(NULL)");
  1087. free(tracename);
  1088. tracename = strdup(peer);
  1089. #endif
  1090. return true;
  1091. }
  1092. }
  1093. errclose();
  1094. }
  1095. #ifdef SOCKTRACE
  1096. PROGLOG("connect_timeout: failed %d",err);
  1097. #endif
  1098. STATS.failedconnects++;
  1099. STATS.failedconnecttime+=usTick()-startt;
  1100. if (!noexception)
  1101. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1102. return false;
  1103. }
  1104. ISocket* ISocket::connect_timeout(const SocketEndpoint &ep,unsigned timeout)
  1105. {
  1106. if (ep.isNull()||(ep.port==0))
  1107. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1108. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1109. sock->connect_timeout(timeout,false);
  1110. return sock.getClear();
  1111. }
  1112. #define POLLTIME 50
  1113. void CSocket::connect_wait(unsigned timems)
  1114. {
  1115. // simple connect with timeout (no fancy stuff!)
  1116. unsigned startt = usTick();
  1117. CTimeMon tm(timems);
  1118. bool exit = false;
  1119. int err;
  1120. unsigned refuseddelay = 1;
  1121. while (!exit) {
  1122. #ifdef CENTRAL_NODE_RANDOM_DELAY
  1123. ForEachItemIn(cn,CentralNodeArray) {
  1124. SocketEndpoint &ep=CentralNodeArray.item(cn);
  1125. if (ep.ipequals(targetip)) {
  1126. unsigned sleeptime = getRandom() % 1000;
  1127. StringBuffer s;
  1128. ep.getIpText(s);
  1129. PrintLog("Connection to central node %s - sleeping %d milliseconds", s.str(), sleeptime);
  1130. Sleep(sleeptime);
  1131. break;
  1132. }
  1133. }
  1134. #endif
  1135. unsigned remaining;
  1136. exit = tm.timedout(&remaining);
  1137. bool blockselect = exit; // if last time round block
  1138. {
  1139. CriticalBlock block(crit);
  1140. if (++connectingcount>4)
  1141. blockselect = true;
  1142. }
  1143. err = pre_connect(blockselect);
  1144. if (blockselect) {
  1145. if (err&&!exit)
  1146. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1147. }
  1148. else {
  1149. unsigned timeoutms = (exit||(remaining<10000))?10000:remaining;
  1150. unsigned polltime = 1;
  1151. while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) {
  1152. T_FD_SET fds;
  1153. struct timeval tv;
  1154. XFD_ZERO(&fds);
  1155. FD_SET((unsigned)sock, &fds);
  1156. T_FD_SET except;
  1157. XFD_ZERO(&except);
  1158. FD_SET((unsigned)sock, &except);
  1159. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1160. tv.tv_sec = timeoutms / 1000;
  1161. tv.tv_usec = (timeoutms % 1000)*1000;
  1162. #else
  1163. tv.tv_sec = 0;
  1164. tv.tv_usec = 0;
  1165. #endif
  1166. CHECKSOCKRANGE(sock);
  1167. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  1168. if (rc>0) {
  1169. // select succeeded - return error from socket (0 if connected)
  1170. socklen_t errlen = sizeof(err);
  1171. rc = getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  1172. if ((rc!=0)&&!err)
  1173. err = ERRNO(); // some implementations of getsockopt duff
  1174. if (err)
  1175. refused_sleep(tm,refuseddelay); // probably ECONNREFUSED but treat all errors same
  1176. break;
  1177. }
  1178. if (rc<0) {
  1179. err = ERRNO();
  1180. LOGERR2(err,2,"::select");
  1181. break;
  1182. }
  1183. if (!timeoutms) {
  1184. #ifdef SOCKTRACE
  1185. PROGLOG("connecttimeout: timed out");
  1186. #endif
  1187. err = -1;
  1188. break;
  1189. }
  1190. #ifdef BLOCK_POLLED_SINGLE_CONNECTS
  1191. break;
  1192. #else
  1193. if (timeoutms<polltime)
  1194. polltime = timeoutms;
  1195. Sleep(polltime); // sleeps 1-50ms (to let other threads run)
  1196. timeoutms -= polltime;
  1197. if (polltime>POLLTIME/2)
  1198. polltime = POLLTIME;
  1199. else
  1200. polltime *= 2;
  1201. #endif
  1202. }
  1203. }
  1204. {
  1205. CriticalBlock block(crit);
  1206. --connectingcount;
  1207. }
  1208. if (err==0) {
  1209. err = post_connect();
  1210. if (err==0) {
  1211. STATS.connects++;
  1212. STATS.connecttime+=usTick()-startt;
  1213. #ifdef _TRACE
  1214. char peer[256];
  1215. peer[0] = 'C';
  1216. peer[1] = '!';
  1217. strcpy(peer+2,hostname?hostname:"(NULL)");
  1218. free(tracename);
  1219. tracename = strdup(peer);
  1220. #endif
  1221. return;
  1222. }
  1223. }
  1224. errclose();
  1225. }
  1226. #ifdef SOCKTRACE
  1227. PROGLOG("connect_wait: failed %d",err);
  1228. #endif
  1229. STATS.failedconnects++;
  1230. STATS.failedconnecttime+=usTick()-startt;
  1231. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1232. }
  1233. ISocket* ISocket::connect_wait( const SocketEndpoint &ep, unsigned timems)
  1234. {
  1235. if (ep.isNull()||(ep.port==0))
  1236. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  1237. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  1238. sock->connect_wait(timems);
  1239. return sock.getClear();
  1240. }
  1241. void CSocket::udpconnect()
  1242. {
  1243. DEFINE_SOCKADDR(u);
  1244. if (targetip.isNull()) {
  1245. set_return_addr(hostport,hostname);
  1246. targetip.ipset(returnep);
  1247. }
  1248. socklen_t ul = setSockAddr(u,targetip,hostport);
  1249. sock = ::socket(u.sa.sa_family, SOCK_DGRAM, targetip.isIp4()?0:PF_INET6);
  1250. #ifdef SOCKTRACE
  1251. PROGLOG("SOCKTRACE: udp connected socket %x %d (%x)", sock, sock, this);
  1252. #endif
  1253. STATS.activesockets++;
  1254. if (sock == INVALID_SOCKET) {
  1255. THROWJSOCKEXCEPTION(ERRNO());
  1256. }
  1257. int res = ::connect(sock, &u.sa, ul);
  1258. if (res != 0) { // works for UDP
  1259. closesock();
  1260. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  1261. }
  1262. nagling = false; // means nothing for UDP
  1263. state = ss_open;
  1264. #ifdef _TRACE
  1265. char peer[256];
  1266. peer[0] = 'C';
  1267. peer[1] = '!';
  1268. strcpy(peer+2,hostname?hostname:"(NULL)");
  1269. free(tracename);
  1270. tracename = strdup(peer);
  1271. #endif
  1272. }
  1273. int CSocket::wait_read(unsigned timeout)
  1274. {
  1275. int ret = 0;
  1276. while (sock!=INVALID_SOCKET) {
  1277. T_FD_SET fds;
  1278. XFD_ZERO(&fds);
  1279. FD_SET((unsigned)sock, &fds);
  1280. CHECKSOCKRANGE(sock);
  1281. if (timeout==WAIT_FOREVER) {
  1282. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
  1283. }
  1284. else {
  1285. struct timeval tv;
  1286. tv.tv_sec = timeout / 1000;
  1287. tv.tv_usec = (timeout % 1000)*1000;
  1288. ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  1289. }
  1290. if (ret==SOCKET_ERROR) {
  1291. int err = ERRNO();
  1292. if (err!=EINTRCALL) { // else retry (should adjust time but for our usage don't think it matters that much)
  1293. LOGERR2(err,1,"wait_read");
  1294. break;
  1295. }
  1296. }
  1297. else
  1298. break;
  1299. }
  1300. return ret;
  1301. }
  1302. int CSocket::wait_write(unsigned timeout)
  1303. {
  1304. int ret = 0;
  1305. while (sock!=INVALID_SOCKET) {
  1306. T_FD_SET fds;
  1307. XFD_ZERO(&fds);
  1308. FD_SET((unsigned)sock, &fds);
  1309. CHECKSOCKRANGE(sock);
  1310. if (timeout==WAIT_FOREVER) {
  1311. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
  1312. }
  1313. else {
  1314. struct timeval tv;
  1315. tv.tv_sec = timeout / 1000;
  1316. tv.tv_usec = (timeout % 1000)*1000;
  1317. ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  1318. }
  1319. if (ret==SOCKET_ERROR) {
  1320. int err = ERRNO();
  1321. if (err!=EINTRCALL) { // else retry (should adjust time but for our usage don't think it matters that much)
  1322. LOGERR2(err,1,"wait_write");
  1323. break;
  1324. }
  1325. }
  1326. else
  1327. break;
  1328. }
  1329. return ret;
  1330. }
  1331. void CSocket::readtms(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1332. unsigned timeoutms)
  1333. {
  1334. if (timeoutms == WAIT_FOREVER) {
  1335. read(buf,min_size, max_size, size_read,WAIT_FOREVER);
  1336. return;
  1337. }
  1338. unsigned startt=usTick();
  1339. size_read = 0;
  1340. if (state != ss_open) {
  1341. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1342. }
  1343. unsigned start;
  1344. unsigned timeleft;
  1345. start = msTick();
  1346. timeleft = timeoutms;
  1347. do {
  1348. int rc = wait_read(timeleft);
  1349. if (rc < 0) {
  1350. THROWJSOCKEXCEPTION(ERRNO());
  1351. }
  1352. if (rc == 0) {
  1353. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1354. }
  1355. unsigned elapsed = (msTick()-start);
  1356. if (elapsed<timeoutms)
  1357. timeleft = timeoutms-elapsed;
  1358. else
  1359. timeleft = 0;
  1360. unsigned retrycount=100;
  1361. EintrRetry:
  1362. if (sockmode==sm_udp_server) { // udp server
  1363. DEFINE_SOCKADDR(u);
  1364. socklen_t ul=sizeof(u);
  1365. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1366. getSockAddrEndpoint(u,ul,returnep);
  1367. }
  1368. else {
  1369. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1370. }
  1371. if (rc < 0) {
  1372. int err = ERRNO();
  1373. if (BADSOCKERR(err)) {
  1374. // don't think this should happen but convert to same as shutdown while investigation
  1375. LOGERR2(err,1,"Socket closed during read");
  1376. rc = 0;
  1377. }
  1378. else if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1379. LOGERR2(err,1,"EINTR retrying");
  1380. goto EintrRetry;
  1381. }
  1382. else {
  1383. LOGERR2(err,1,"readtms");
  1384. if ((err==ECONNRESET)||(err==EINTRCALL)||(err==ECONNABORTED)) {
  1385. errclose();
  1386. err = JSOCKERR_broken_pipe;
  1387. }
  1388. THROWJSOCKEXCEPTION(err);
  1389. }
  1390. }
  1391. if (rc == 0) {
  1392. state = ss_shutdown;
  1393. if (min_size==0)
  1394. break; // if min_read is 0 return 0 if socket closed
  1395. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1396. }
  1397. size_read += rc;
  1398. } while (size_read < min_size);
  1399. STATS.reads++;
  1400. STATS.readsize += size_read;
  1401. STATS.readtime+=usTick()-startt;
  1402. }
  1403. void CSocket::read(void* buf, size32_t min_size, size32_t max_size, size32_t &size_read,
  1404. unsigned timeoutsecs)
  1405. {
  1406. unsigned startt=usTick();
  1407. size_read = 0;
  1408. unsigned start;
  1409. unsigned timeleft = 0;
  1410. if (state != ss_open) {
  1411. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1412. }
  1413. if (timeoutsecs != WAIT_FOREVER) {
  1414. start = (unsigned)time(NULL);
  1415. timeleft = timeoutsecs;
  1416. }
  1417. do {
  1418. int rc;
  1419. if (timeoutsecs != WAIT_FOREVER) {
  1420. rc = wait_read(timeleft*1000);
  1421. if (rc < 0) {
  1422. THROWJSOCKEXCEPTION(ERRNO());
  1423. }
  1424. if (rc == 0) {
  1425. THROWJSOCKEXCEPTION(JSOCKERR_timeout_expired);
  1426. }
  1427. unsigned elapsed = ((unsigned)time(NULL))-start;
  1428. if (elapsed<timeoutsecs)
  1429. timeleft = timeoutsecs-elapsed;
  1430. else
  1431. timeleft = 0;
  1432. }
  1433. unsigned retrycount=100;
  1434. EintrRetry:
  1435. if (sockmode==sm_udp_server) { // udp server
  1436. DEFINE_SOCKADDR(u);
  1437. socklen_t ul=sizeof(u.sin);
  1438. rc = recvfrom(sock, (char*)buf + size_read, max_size - size_read, 0, &u.sa,&ul);
  1439. getSockAddrEndpoint(u,ul,returnep);
  1440. }
  1441. else {
  1442. rc = recv(sock, (char*)buf + size_read, max_size - size_read, 0);
  1443. }
  1444. if (rc < 0) {
  1445. int err = ERRNO();
  1446. if (BADSOCKERR(err)) {
  1447. // don't think this should happen but convert to same as shutdown while investigation
  1448. LOGERR2(err,3,"Socket closed during read");
  1449. rc = 0;
  1450. }
  1451. else if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1452. if (sock==INVALID_SOCKET)
  1453. rc = 0; // convert an EINTR after closed to a graceful close
  1454. else {
  1455. LOGERR2(err,3,"EINTR retrying");
  1456. goto EintrRetry;
  1457. }
  1458. }
  1459. else {
  1460. LOGERR2(err,3,"read");
  1461. if ((err==ECONNRESET)||(err==EINTRCALL)||(err==ECONNABORTED)) {
  1462. errclose();
  1463. err = JSOCKERR_broken_pipe;
  1464. }
  1465. THROWJSOCKEXCEPTION(err);
  1466. }
  1467. }
  1468. if (rc == 0) {
  1469. state = ss_shutdown;
  1470. if (min_size==0)
  1471. break; // if min_read is 0 return 0 if socket closed
  1472. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1473. }
  1474. size_read += rc;
  1475. } while (size_read < min_size);
  1476. STATS.reads++;
  1477. STATS.readsize += size_read;
  1478. STATS.readtime+=usTick()-startt;
  1479. }
  1480. void CSocket::read(void* buf, size32_t size)
  1481. {
  1482. if (!size)
  1483. return;
  1484. unsigned startt=usTick();
  1485. size32_t size_read=size;
  1486. if (state != ss_open) {
  1487. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1488. }
  1489. do {
  1490. unsigned retrycount=100;
  1491. EintrRetry:
  1492. int rc;
  1493. if (sockmode==sm_udp_server) { // udp server
  1494. DEFINE_SOCKADDR(u);
  1495. socklen_t ul=sizeof(u.sin);
  1496. rc = recvfrom(sock, (char*)buf, size, 0, &u.sa,&ul);
  1497. getSockAddrEndpoint(u,ul,returnep);
  1498. }
  1499. else {
  1500. rc = recv(sock, (char*)buf, size, 0);
  1501. }
  1502. if (rc < 0) {
  1503. int err = ERRNO();
  1504. if (BADSOCKERR(err)) {
  1505. // don't think this should happen but convert to same as shutdown while investigation
  1506. LOGERR2(err,5,"Socket closed during read");
  1507. rc = 0;
  1508. }
  1509. else if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1510. LOGERR2(err,5,"EINTR retrying");
  1511. goto EintrRetry;
  1512. }
  1513. else {
  1514. LOGERR2(err,5,"read");
  1515. if ((err==ECONNRESET)||(err==EINTRCALL)||(err==ECONNABORTED)) {
  1516. errclose();
  1517. err = JSOCKERR_broken_pipe;
  1518. }
  1519. THROWJSOCKEXCEPTION(err);
  1520. }
  1521. }
  1522. if (rc == 0) {
  1523. state = ss_shutdown;
  1524. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1525. }
  1526. buf = (char*)buf + rc;
  1527. size -= rc;
  1528. } while (size != 0);
  1529. STATS.reads++;
  1530. STATS.readsize += size_read;
  1531. STATS.readtime+=usTick()-startt;
  1532. }
  1533. size32_t CSocket::write(void const* buf, size32_t size)
  1534. {
  1535. if (size==0)
  1536. return 0;
  1537. unsigned startt=usTick();
  1538. size32_t size_writ = size;
  1539. if (state != ss_open) {
  1540. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1541. }
  1542. size32_t res=0;
  1543. do {
  1544. unsigned retrycount=100;
  1545. EintrRetry:
  1546. int rc;
  1547. if (sockmode==sm_udp_server) { // udp server
  1548. DEFINE_SOCKADDR(u);
  1549. socklen_t ul = setSockAddr(u,returnep,returnep.port);
  1550. rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1551. }
  1552. else {
  1553. rc = send(sock, (char*)buf, size, SEND_FLAGS);
  1554. }
  1555. if (rc < 0) {
  1556. int err=ERRNO();
  1557. if (BADSOCKERR(err)) {
  1558. LOGERR2(err,7,"Socket closed during write");
  1559. rc = 0;
  1560. }
  1561. else if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1562. LOGERR2(err,7,"EINTR retrying");
  1563. goto EintrRetry;
  1564. }
  1565. else {
  1566. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==ECONNREFUSED))
  1567. break; // ignore
  1568. LOGERR2(err,7,"write");
  1569. if ((err==ECONNRESET)||(err==EINTRCALL)||(err==ECONNABORTED)
  1570. #ifndef _WIN32
  1571. ||(err==EPIPE)||(err==ETIMEDOUT) // linux can raise these on broken pipe
  1572. #endif
  1573. ) {
  1574. errclose();
  1575. err = JSOCKERR_broken_pipe;
  1576. }
  1577. if ((err == EWOULDBLOCK) && nonblocking)
  1578. break;
  1579. THROWJSOCKEXCEPTION(err);
  1580. }
  1581. }
  1582. res += rc;
  1583. if (rc == 0) {
  1584. state = ss_shutdown;
  1585. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1586. }
  1587. if (nonblocking)
  1588. break;
  1589. buf = (char*)buf + rc;
  1590. size -= rc;
  1591. } while (size != 0);
  1592. STATS.writes++;
  1593. STATS.writesize += size_writ;
  1594. STATS.writetime+=usTick()-startt;
  1595. return res;
  1596. }
  1597. bool CSocket::check_connection()
  1598. {
  1599. if (state != ss_open)
  1600. return false;
  1601. unsigned retrycount=100;
  1602. EintrRetry:
  1603. int rc;
  1604. if (sockmode==sm_udp_server) { // udp server
  1605. DEFINE_SOCKADDR(u);
  1606. socklen_t ul = setSockAddr(u,returnep,returnep.port);
  1607. rc = sendto(sock, NULL, 0, 0, &u.sa, ul);
  1608. }
  1609. else {
  1610. rc = send(sock, NULL, 0, SEND_FLAGS);
  1611. }
  1612. if (rc < 0) {
  1613. int err=ERRNO();
  1614. if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1615. LOGERR2(err,7,"EINTR retrying");
  1616. goto EintrRetry;
  1617. }
  1618. else
  1619. return false;
  1620. }
  1621. return true;
  1622. }
  1623. size32_t CSocket::udp_write_to(SocketEndpoint &ep, void const* buf, size32_t size)
  1624. {
  1625. if (size==0)
  1626. return 0;
  1627. unsigned startt=usTick();
  1628. size32_t size_writ = size;
  1629. if (state != ss_open) {
  1630. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1631. }
  1632. size32_t res=0;
  1633. DEFINE_SOCKADDR(u);
  1634. loop {
  1635. socklen_t ul = setSockAddr(u,ep,ep.port);
  1636. int rc = sendto(sock, (char*)buf, size, 0, &u.sa, ul);
  1637. if (rc < 0) {
  1638. int err=ERRNO();
  1639. if (((sockmode==sm_multicast)||(sockmode==sm_udp))&&(err==ECONNREFUSED))
  1640. break; // ignore
  1641. if (err!=EINTRCALL) {
  1642. THROWJSOCKEXCEPTION(err);
  1643. }
  1644. }
  1645. else {
  1646. res = (size32_t)rc;
  1647. break;
  1648. }
  1649. }
  1650. STATS.writes++;
  1651. STATS.writesize += res;
  1652. STATS.writetime+=usTick()-startt;
  1653. return res;
  1654. }
  1655. size32_t CSocket::write_multiple(unsigned num,const void **buf, size32_t *size)
  1656. {
  1657. assertex(sockmode!=sm_udp_server);
  1658. assertex(!nonblocking);
  1659. if (num==1)
  1660. return write(buf[0],size[0]);
  1661. size32_t total = 0;
  1662. unsigned i;
  1663. for (i=0;i<num;i++)
  1664. total += size[i];
  1665. if (total==0)
  1666. return 0;
  1667. unsigned startt=usTick();
  1668. if (state != ss_open) {
  1669. THROWJSOCKEXCEPTION(JSOCKERR_not_opened);
  1670. }
  1671. size32_t res=0;
  1672. #ifdef _WIN32
  1673. WSABUF *bufs = (WSABUF *)alloca(sizeof(WSABUF)*num);
  1674. for (i=0;i<num;i++) {
  1675. bufs[i].buf = (char *)buf[i];
  1676. bufs[i].len = size[i];
  1677. }
  1678. unsigned retrycount=100;
  1679. EintrRetry:
  1680. DWORD sent;
  1681. if (WSASendTo(sock,bufs,num,&sent,0,NULL,0,NULL,NULL)==SOCKET_ERROR) {
  1682. int err=ERRNO();
  1683. if (BADSOCKERR(err)) {
  1684. LOGERR2(err,8,"Socket closed during write");
  1685. sent = 0;
  1686. }
  1687. else if ((err==EINTRCALL)&&(retrycount--!=0)) {
  1688. LOGERR2(err,8,"EINTR retrying");
  1689. goto EintrRetry;
  1690. }
  1691. else {
  1692. LOGERR2(err,8,"write_multiple");
  1693. if ((err==ECONNRESET)||(err==EINTRCALL)||(err==ECONNABORTED)||(err==ETIMEDOUT)) {
  1694. errclose();
  1695. err = JSOCKERR_broken_pipe;
  1696. }
  1697. THROWJSOCKEXCEPTION(err);
  1698. }
  1699. }
  1700. if (sent == 0) {
  1701. state = ss_shutdown;
  1702. THROWJSOCKEXCEPTION(JSOCKERR_graceful_close);
  1703. }
  1704. res = sent;
  1705. #else
  1706. #ifdef USE_CORK
  1707. if (total>1024) {
  1708. class Copt
  1709. {
  1710. T_SOCKET sock;
  1711. bool nagling;
  1712. public:
  1713. Copt(T_SOCKET _sock,bool _nagling)
  1714. {
  1715. nagling = _nagling;
  1716. int enabled = 1;
  1717. int disabled = 0;
  1718. if (!nagling)
  1719. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&disabled, sizeof(disabled));
  1720. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&enabled, sizeof(enabled));
  1721. }
  1722. ~Copt()
  1723. {
  1724. int enabled = 1;
  1725. int disabled = 0;
  1726. setsockopt(sock, IPPROTO_TCP, TCP_CORK, (char*)&disabled, sizeof(disabled));
  1727. if (!nagling)
  1728. setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char*)&enabled, sizeof(enabled));
  1729. }
  1730. } copt(sock,nagling);
  1731. for (i=0;i<num;i++)
  1732. res += write(buf[i],size[i]);
  1733. }
  1734. else {
  1735. byte b[1024];
  1736. byte *p=b;
  1737. for (i=0;i<num;i++) {
  1738. memcpy(p,buf[i],size[i]);
  1739. p += size[i];
  1740. }
  1741. res = write(b,total);
  1742. }
  1743. #else
  1744. // send in equal chunks about 64K
  1745. unsigned n = (total+0xffff)/0x10000;
  1746. size32_t outbufsize = (total+n-1)/n;
  1747. MemoryAttr ma;
  1748. byte *outbuf = (byte *)ma.allocate(outbufsize);
  1749. size32_t outwr = 0;
  1750. i = 0;
  1751. size32_t os = 0;
  1752. size32_t left = total;
  1753. byte *b = NULL;
  1754. size32_t s=0;
  1755. loop {
  1756. while (!s&&(i<num)) {
  1757. b = (byte *)buf[i];
  1758. s = size[i];
  1759. i++;
  1760. }
  1761. if ((os==0)&&(s==left)) {
  1762. write(b,s); // go for it
  1763. break;
  1764. }
  1765. else {
  1766. size32_t cpy = outbufsize-os;
  1767. if (cpy>s)
  1768. cpy = s;
  1769. memcpy(outbuf+os,b,cpy);
  1770. os += cpy;
  1771. left -= cpy;
  1772. s -= cpy;
  1773. b += cpy;
  1774. if (left==0) {
  1775. write(outbuf,os);
  1776. break;
  1777. }
  1778. else if (os==outbufsize) {
  1779. write(outbuf,os);
  1780. os = 0;
  1781. }
  1782. }
  1783. }
  1784. #endif
  1785. #endif
  1786. STATS.writes++;
  1787. STATS.writesize += res;
  1788. STATS.writetime+=usTick()-startt;
  1789. return res;
  1790. }
  1791. bool CSocket::send_block(const void *blk,size32_t sz)
  1792. {
  1793. unsigned startt=usTick();
  1794. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  1795. unsigned startt2 = startt;
  1796. unsigned startt3 = startt;
  1797. #endif
  1798. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  1799. size32_t rd;
  1800. bool eof = true;
  1801. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  1802. if (eof)
  1803. return false;
  1804. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  1805. startt2=usTick();
  1806. #endif
  1807. }
  1808. if (!blk||!sz) {
  1809. sz = 0;
  1810. write(&sz,sizeof(sz));
  1811. try {
  1812. bool reply;
  1813. size32_t rd;
  1814. readtms(&reply,sizeof(reply),sizeof(reply),rd,blocktimeoutms);
  1815. }
  1816. catch (IJSOCK_Exception *e) {
  1817. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  1818. EXCLOG(e,"CSocket::send_block");
  1819. e->Release();
  1820. }
  1821. return false;
  1822. }
  1823. size32_t rsz=sz;
  1824. _WINREV(rsz);
  1825. write(&rsz,sizeof(rsz));
  1826. if (blockflags&BF_SYNC_TRANSFER_PUSH) {
  1827. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  1828. startt2=usTick();
  1829. #endif
  1830. size32_t rd;
  1831. bool eof = true;
  1832. readtms(&eof,sizeof(eof),sizeof(eof),rd,blocktimeoutms);
  1833. if (eof)
  1834. return false;
  1835. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  1836. startt3=usTick();
  1837. #endif
  1838. }
  1839. write(blk,sz);
  1840. if (blockflags&BF_RELIABLE_TRANSFER) {
  1841. bool isok=false;
  1842. size32_t rd;
  1843. readtms(&isok,sizeof(isok),sizeof(isok),rd,blocktimeoutms);
  1844. if (!isok)
  1845. return false;
  1846. }
  1847. unsigned nowt = usTick();
  1848. unsigned elapsed = nowt-startt;
  1849. STATS.blocksendtime+=elapsed;
  1850. STATS.numblocksends++;
  1851. STATS.blocksendsize+=sz;
  1852. if (elapsed>STATS.longestblocksend) {
  1853. STATS.longestblocksend = elapsed;
  1854. STATS.longestblocksize = sz;
  1855. }
  1856. #ifdef TRACE_SLOW_BLOCK_TRANSFER
  1857. static unsigned lastreporttime=0;
  1858. static unsigned lastexceeded=0;
  1859. if (elapsed>1000000*60) { // over 1min
  1860. unsigned t = msTick();
  1861. if (1) { //((t-lastreporttime>1000*60) || // only report once per min
  1862. // (elapsed>lastexceeded*2)) {
  1863. lastexceeded = elapsed;
  1864. lastreporttime = t;
  1865. WARNLOG("send_block took %ds to %s (%d,%d,%d)",elapsed/1000000,tracename,startt2-startt,startt3-startt2,nowt-startt3);
  1866. }
  1867. }
  1868. #endif
  1869. return true;
  1870. }
  1871. #ifdef USERECVSEM
  1872. class CSemProtect
  1873. {
  1874. Semaphore *sem;
  1875. bool *owned;
  1876. public:
  1877. CSemProtect() { clear(); }
  1878. ~CSemProtect()
  1879. {
  1880. if (sem&&*owned) {
  1881. *owned = false;
  1882. sem->signal();
  1883. }
  1884. }
  1885. void set(Semaphore *_sem,bool *_owned)
  1886. {
  1887. sem = _sem;
  1888. owned = _owned;
  1889. }
  1890. bool wait(Semaphore *_sem,bool *_owned,unsigned timeout) {
  1891. if (!*_owned&&!_sem->wait(timeout))
  1892. return false;
  1893. *_owned = true;
  1894. set(_sem,_owned);
  1895. return true;
  1896. }
  1897. void clear() { sem = NULL; owned = NULL; }
  1898. };
  1899. #endif
  1900. size32_t CSocket::receive_block_size()
  1901. {
  1902. // assumed always paired with receive_block
  1903. if (nextblocksize) {
  1904. if (blockflags&BF_SYNC_TRANSFER_PULL) {
  1905. bool eof=false;
  1906. write(&eof,sizeof(eof));
  1907. }
  1908. size32_t rd;
  1909. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  1910. _WINREV(nextblocksize);
  1911. if (nextblocksize==0) { // confirm eof
  1912. try {
  1913. bool confirm=true;
  1914. write(&confirm,sizeof(confirm));
  1915. }
  1916. catch (IJSOCK_Exception *e) {
  1917. if ((e->errorCode()!=JSOCKERR_broken_pipe)&&(e->errorCode()!=JSOCKERR_graceful_close))
  1918. EXCLOG(e,"receive_block_size");
  1919. e->Release();
  1920. }
  1921. }
  1922. else if (blockflags&BF_SYNC_TRANSFER_PUSH) { // leaves receiveblocksem clear
  1923. #ifdef USERECVSEM
  1924. CSemProtect semprot; // this will catch exception in write
  1925. while (!semprot.wait(&receiveblocksem,&receiveblocksemowned,60*1000*5))
  1926. WARNLOG("Receive block stalled");
  1927. #endif
  1928. bool eof=false;
  1929. write(&eof,sizeof(eof));
  1930. #ifdef USERECVSEM
  1931. semprot.clear();
  1932. #endif
  1933. }
  1934. }
  1935. return nextblocksize;
  1936. }
  1937. size32_t CSocket::receive_block(void *blk,size32_t maxsize)
  1938. {
  1939. #ifdef USERECVSEM
  1940. CSemProtect semprot; // this will catch exceptions
  1941. #endif
  1942. size32_t sz = nextblocksize;
  1943. if (sz) {
  1944. if (sz==UINT_MAX) { // need to get size
  1945. if (!blk||!maxsize) {
  1946. if (blockflags&BF_SYNC_TRANSFER_PUSH) { // ignore block size
  1947. size32_t rd;
  1948. readtms(&nextblocksize,sizeof(nextblocksize),sizeof(nextblocksize),rd,blocktimeoutms);
  1949. }
  1950. if (blockflags&(BF_SYNC_TRANSFER_PULL|BF_SYNC_TRANSFER_PUSH)) { // signal eof
  1951. bool eof=true;
  1952. write(&eof,sizeof(eof));
  1953. nextblocksize = 0;
  1954. return 0;
  1955. }
  1956. }
  1957. sz = receive_block_size();
  1958. if (!sz)
  1959. return 0;
  1960. }
  1961. unsigned startt=usTick(); // include sem block but not initial handshake
  1962. #ifdef USERECVSEM
  1963. if (blockflags&BF_SYNC_TRANSFER_PUSH) // read_block_size sets semaphore
  1964. semprot.set(&receiveblocksem,&receiveblocksemowned); // this will reset semaphore on exit
  1965. #endif
  1966. nextblocksize = UINT_MAX;
  1967. size32_t rd;
  1968. if (sz<=maxsize) {
  1969. readtms(blk,sz,sz,rd,blocktimeoutms);
  1970. }
  1971. else { // truncate
  1972. readtms(blk,maxsize,maxsize,rd,blocktimeoutms);
  1973. sz -= maxsize;
  1974. void *tmp=malloc(sz);
  1975. readtms(tmp,sz,sz,rd,blocktimeoutms);
  1976. free(tmp);
  1977. sz = maxsize;
  1978. }
  1979. if (blockflags&BF_RELIABLE_TRANSFER) {
  1980. bool isok=true;
  1981. write(&isok,sizeof(isok));
  1982. }
  1983. unsigned elapsed = usTick()-startt;
  1984. STATS.blockrecvtime+=elapsed;
  1985. STATS.numblockrecvs++;
  1986. STATS.blockrecvsize+=sz;
  1987. }
  1988. return sz;
  1989. }
  1990. void CSocket::set_block_mode(unsigned flags, size32_t recsize, unsigned _timeoutms)
  1991. {
  1992. blockflags = flags;
  1993. nextblocksize = UINT_MAX;
  1994. blocktimeoutms = _timeoutms?_timeoutms:WAIT_FOREVER;
  1995. }
  1996. void CSocket::shutdown(unsigned mode)
  1997. {
  1998. if (state == ss_open) {
  1999. state = ss_shutdown;
  2000. #ifdef SOCKTRACE
  2001. PROGLOG("SOCKTRACE: shutdown(%d) socket %x %d (%x)", mode, sock, sock, this);
  2002. #endif
  2003. int rc = ::shutdown(sock, mode);
  2004. if (rc != 0) {
  2005. int err=ERRNO();
  2006. if (err==ENOTCONN) {
  2007. LOGERR2(err,9,"shutdown");
  2008. err = JSOCKERR_broken_pipe;
  2009. }
  2010. THROWJSOCKEXCEPTION(err);
  2011. }
  2012. }
  2013. }
  2014. void CSocket::errclose()
  2015. {
  2016. #ifdef USERECVSEM
  2017. if (receiveblocksemowned) {
  2018. receiveblocksemowned = false;
  2019. receiveblocksem.signal();
  2020. }
  2021. #endif
  2022. if (state != ss_close) {
  2023. state = ss_close;
  2024. #ifdef SOCKTRACE
  2025. PROGLOG("SOCKTRACE: errclose socket %x %d (%x)", sock, sock, this);
  2026. #endif
  2027. if (mcastreq)
  2028. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2029. closesock();
  2030. }
  2031. }
  2032. void CSocket::close()
  2033. {
  2034. #ifdef USERECVSEM
  2035. if (receiveblocksemowned) {
  2036. receiveblocksemowned = false;
  2037. receiveblocksem.signal();
  2038. }
  2039. #endif
  2040. if (state != ss_close) {
  2041. #ifdef SOCKTRACE
  2042. PROGLOG("SOCKTRACE: close socket %x %d (%x)", sock, sock, this);
  2043. #endif
  2044. state = ss_close;
  2045. if (mcastreq)
  2046. setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)mcastreq,sizeof(*mcastreq));
  2047. if (closesock() != 0) {
  2048. THROWJSOCKEXCEPTION(ERRNO());
  2049. }
  2050. }
  2051. }
  2052. size32_t CSocket::get_max_send_size()
  2053. {
  2054. size32_t maxsend=0;
  2055. socklen_t size = sizeof(maxsend);
  2056. #if _WIN32
  2057. getsockopt(sock, SOL_SOCKET, SO_MAX_MSG_SIZE, (char *) &maxsend, &size);
  2058. #else
  2059. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size); // not the same but closest I can find
  2060. #endif
  2061. return maxsend;
  2062. }
  2063. size32_t CSocket::get_send_buffer_size()
  2064. {
  2065. size32_t maxsend=0;
  2066. socklen_t size = sizeof(maxsend);
  2067. getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *) &maxsend, &size);
  2068. return maxsend;
  2069. }
  2070. void CSocket::set_send_buffer_size(size32_t maxsend)
  2071. {
  2072. if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&maxsend, sizeof(maxsend))!=0) {
  2073. LOGERR2(ERRNO(),1,"setsockopt(SO_SNDBUF)");
  2074. }
  2075. #ifdef CHECKBUFSIZE
  2076. size32_t v;
  2077. if (getsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&v, sizeof(v))!=0) {
  2078. LOGERR2(ERRNO(),1,"getsockopt(SO_SNDBUF)");
  2079. }
  2080. if (v!=maxsend)
  2081. WARNLOG("set_send_buffer_size requested %d, got %d",maxsend,v);
  2082. #endif
  2083. }
  2084. size32_t CSocket::get_receive_buffer_size()
  2085. {
  2086. size32_t max=0;
  2087. socklen_t size = sizeof(max);
  2088. getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *) &max, &size);
  2089. return max;
  2090. }
  2091. void CSocket::set_receive_buffer_size(size32_t max)
  2092. {
  2093. if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&max, sizeof(max))!=0) {
  2094. LOGERR2(ERRNO(),1,"setsockopt(SO_RCVBUF)");
  2095. }
  2096. #ifdef CHECKBUFSIZE
  2097. size32_t v;
  2098. if (getsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char *)&v, sizeof(v))!=0) {
  2099. LOGERR2(ERRNO(),1,"getsockopt(SO_RCVBUF)");
  2100. }
  2101. if (v<max)
  2102. WARNLOG("set_receive_buffer_size requested %d, got %d",max,v);
  2103. #endif
  2104. }
  2105. bool CSocket::join_multicast_group(SocketEndpoint &ep)
  2106. {
  2107. StringBuffer s;
  2108. ep.getIpText(s); // will improve later
  2109. MCASTREQ req(s.str());
  2110. if (setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2111. return false;
  2112. }
  2113. return true;
  2114. }
  2115. bool CSocket::leave_multicast_group(SocketEndpoint &ep)
  2116. {
  2117. StringBuffer s;
  2118. ep.getIpText(s); // will improve later
  2119. MCASTREQ req(s.str());
  2120. if (setsockopt(sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,(char*)&req, sizeof(req))!=0) {
  2121. return false;
  2122. }
  2123. return true;
  2124. }
  2125. CSocket::~CSocket()
  2126. {
  2127. if (owned)
  2128. close();
  2129. free(hostname);
  2130. hostname = NULL;
  2131. #ifdef _TRACE
  2132. free(tracename);
  2133. tracename = NULL;
  2134. #endif
  2135. delete mcastreq;
  2136. }
  2137. CSocket::CSocket(const SocketEndpoint &ep,SOCKETMODE smode,const char *name)
  2138. {
  2139. state = ss_close;
  2140. nonblocking = false;
  2141. #ifdef USERECVSEM
  2142. receiveblocksemowned = false;
  2143. #endif
  2144. nagling = true; // until turned off
  2145. hostport = ep.port;
  2146. hostname = NULL;
  2147. mcastreq = NULL;
  2148. tracename = NULL;
  2149. StringBuffer tmp;
  2150. if ((smode==sm_multicast_server)&&(name&&*name)) {
  2151. mcastreq = new MCASTREQ(name);
  2152. }
  2153. else {
  2154. if (!name&&!ep.isNull())
  2155. name = ep.getIpText(tmp).str();
  2156. hostname = name?strdup(name):NULL;
  2157. }
  2158. sock = INVALID_SOCKET;
  2159. sockmode = smode;
  2160. owned = true;
  2161. nextblocksize = 0;
  2162. in_accept = false;
  2163. accept_cancel_state = accept_not_cancelled;
  2164. #ifdef _TRACE
  2165. char peer[256];
  2166. peer[0] = name?'T':'S';
  2167. peer[1] = '>';
  2168. if (name)
  2169. strcpy(peer+2,name);
  2170. else {
  2171. SocketEndpoint self;
  2172. self.setLocalHost(0);
  2173. self.getUrlStr(peer+2,sizeof(peer)-2);
  2174. }
  2175. tracename = strdup(peer);
  2176. #endif
  2177. }
  2178. CSocket::CSocket(T_SOCKET new_sock,SOCKETMODE smode,bool _owned)
  2179. {
  2180. nonblocking = false;
  2181. #ifdef USERECVSEM
  2182. receiveblocksemowned = false;
  2183. #endif
  2184. nagling = true; // until turned off
  2185. sock = new_sock;
  2186. if (new_sock!=INVALID_SOCKET)
  2187. STATS.activesockets++;
  2188. hostname = NULL;
  2189. mcastreq = NULL;
  2190. hostport = 0;
  2191. tracename = NULL;
  2192. state = ss_open;
  2193. sockmode = smode;
  2194. owned = _owned;
  2195. nextblocksize = 0;
  2196. in_accept = false;
  2197. accept_cancel_state = accept_not_cancelled;
  2198. set_nagle(false);
  2199. //set_linger(DEFAULT_LINGER_TIME); -- experiment with removing this as closesocket should still endevour to send outstanding data
  2200. #ifdef _TRACE
  2201. char peer[256];
  2202. peer[0] = 'A';
  2203. peer[1] = '!';
  2204. peer_name(peer+2,sizeof(peer)-2);
  2205. tracename = strdup(peer);
  2206. #endif
  2207. }
  2208. ISocket* ISocket::create(unsigned short p,int listen_queue_size)
  2209. {
  2210. if (p==0)
  2211. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2212. SocketEndpoint ep;
  2213. ep.port = p;
  2214. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,NULL);
  2215. sock->open(listen_queue_size);
  2216. return sock.getClear();
  2217. }
  2218. ISocket* ISocket::create_ip(unsigned short p,const char *host,int listen_queue_size)
  2219. {
  2220. if (p==0)
  2221. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2222. SocketEndpoint ep(host,p);
  2223. Owned<CSocket> sock = new CSocket(ep,sm_tcp_server,host);
  2224. sock->open(listen_queue_size);
  2225. return sock.getClear();
  2226. }
  2227. ISocket* ISocket::udp_create(unsigned short p)
  2228. {
  2229. SocketEndpoint ep;
  2230. ep.port=p;
  2231. Owned<CSocket> sock = new CSocket(ep,(p==0)?sm_udp:sm_udp_server,NULL);
  2232. sock->open(0);
  2233. return sock.getClear();
  2234. }
  2235. ISocket* ISocket::multicast_create(unsigned short p, const char *mcip)
  2236. {
  2237. if (p==0)
  2238. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2239. SocketEndpoint ep(mcip,p);
  2240. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,mcip);
  2241. sock->open(0,true);
  2242. return sock.getClear();
  2243. }
  2244. ISocket* ISocket::multicast_create(unsigned short p, const IpAddress &ip)
  2245. {
  2246. if (p==0)
  2247. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2248. SocketEndpoint ep(p, ip);
  2249. StringBuffer tmp;
  2250. Owned<CSocket> sock = new CSocket(ep,sm_multicast_server,ip.getIpText(tmp).str());
  2251. sock->open(0,true);
  2252. return sock.getClear();
  2253. }
  2254. ISocket* ISocket::udp_connect(unsigned short p, char const* name)
  2255. {
  2256. if (!name||!*name||(p==0))
  2257. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2258. SocketEndpoint ep(name, p);
  2259. Owned<CSocket> sock = new CSocket(ep,sm_udp,name);
  2260. sock->udpconnect();
  2261. return sock.getClear();
  2262. }
  2263. ISocket* ISocket::udp_connect(const SocketEndpoint &ep)
  2264. {
  2265. Owned<CSocket> sock = new CSocket(ep,sm_udp,NULL);
  2266. sock->udpconnect();
  2267. return sock.getClear();
  2268. }
  2269. ISocket* ISocket::multicast_connect(unsigned short p, char const* mcip, unsigned _ttl)
  2270. {
  2271. if (p==0)
  2272. THROWJSOCKEXCEPTION2(JSOCKERR_bad_address);
  2273. SocketEndpoint ep(mcip,p);
  2274. return multicast_connect(ep, _ttl);
  2275. }
  2276. ISocket* ISocket::multicast_connect(const SocketEndpoint &ep, unsigned _ttl)
  2277. {
  2278. Owned<CSocket> sock = new CSocket(ep,sm_multicast,NULL);
  2279. sock->udpconnect();
  2280. u_char ttl = _ttl;
  2281. setsockopt(sock->OShandle(), IPPROTO_IP, IP_MULTICAST_TTL, (char *) &ttl, sizeof(ttl));
  2282. return sock.getClear();
  2283. }
  2284. ISocket* ISocket::attach(int s, bool tcpip)
  2285. {
  2286. CSocket* sock = new CSocket((SOCKET)s, tcpip?sm_tcp:sm_udp, false);
  2287. return sock;
  2288. }
  2289. bool isInterfaceIp(const IpAddress &ip, const char *ifname)
  2290. {
  2291. #ifdef _WIN32
  2292. return false;
  2293. #else
  2294. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2295. if (fd<0)
  2296. return false;
  2297. MemoryAttr ma;
  2298. char *buf = (char *)ma.allocate(1024);
  2299. struct ifconf ifc;
  2300. ifc.ifc_len = 1024;
  2301. ifc.ifc_buf = buf;
  2302. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2303. return false;
  2304. struct ifreq *ifr = ifc.ifc_req;
  2305. unsigned n = ifc.ifc_len/sizeof(struct ifreq);
  2306. bool match = false;
  2307. for(unsigned i=0; i<n; i++)
  2308. {
  2309. struct ifreq *item = &ifr[i];
  2310. if (ifname&&*ifname)
  2311. if (!WildMatch(item->ifr_name,ifname))
  2312. continue;
  2313. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2314. if (ip.ipequals(iptest))
  2315. {
  2316. match = true;
  2317. break;
  2318. }
  2319. }
  2320. close(fd);
  2321. return match;
  2322. #endif
  2323. }
  2324. bool getInterfaceIp(IpAddress &ip,const char *ifname)
  2325. {
  2326. #ifdef _WIN32
  2327. return false;
  2328. #else
  2329. ip.ipset(NULL);
  2330. int fd = socket(AF_INET, SOCK_DGRAM, 0); // IPV6 TBD
  2331. if (fd<0)
  2332. return false;
  2333. MemoryAttr ma;
  2334. char *buf = (char *)ma.allocate(1024);
  2335. struct ifconf ifc;
  2336. ifc.ifc_len = 1024;
  2337. ifc.ifc_buf = buf;
  2338. if(ioctl(fd, SIOCGIFCONF, &ifc) < 0) // query interfaces
  2339. return false;
  2340. struct ifreq *ifr = ifc.ifc_req;
  2341. unsigned n = ifc.ifc_len/sizeof(struct ifreq);
  2342. for (int loopback = 0; loopback <= 1; loopback++)
  2343. {
  2344. for (int i=0; i<n; i++)
  2345. {
  2346. bool useLoopback = (loopback==1);
  2347. struct ifreq *item = &ifr[i];
  2348. if (ifname&&*ifname)
  2349. if (!WildMatch(item->ifr_name,ifname))
  2350. continue;
  2351. IpAddress iptest((inet_ntoa(((struct sockaddr_in *)&item->ifr_addr)->sin_addr)));
  2352. if (iptest.isLoopBack() == useLoopback)
  2353. {
  2354. if (ip.isNull())
  2355. ip.ipset(iptest);
  2356. else if (!PreferredSubnet.isNull()&&!PreferredSubnet.test(ip)&&PreferredSubnet.test(iptest))
  2357. ip.ipset(iptest);
  2358. }
  2359. }
  2360. if (!ip.isNull())
  2361. break;
  2362. }
  2363. close(fd);
  2364. return !ip.isNull();
  2365. #endif
  2366. }
  2367. static StringAttr cachehostname;
  2368. static IpAddress cachehostip;
  2369. static IpAddress localhostip;
  2370. static CriticalSection hostnamesect;
  2371. static StringBuffer EnvConfPath;
  2372. const char * GetCachedHostName()
  2373. {
  2374. CriticalBlock c(hostnamesect);
  2375. if (!cachehostname.get())
  2376. {
  2377. #ifndef _WIN32
  2378. IpAddress ip;
  2379. if (EnvConfPath.length() == 0)
  2380. EnvConfPath.append(CONFIG_DIR).append(PATHSEPSTR).append("environment.conf");
  2381. Owned<IProperties> conf = createProperties(EnvConfPath.str(), true);
  2382. StringBuffer ifs;
  2383. conf->getProp("interface", ifs);
  2384. if (getInterfaceIp(ip, ifs.str()))
  2385. {
  2386. StringBuffer ips;
  2387. ip.getIpText(ips);
  2388. if (ips.length())
  2389. {
  2390. cachehostname.set(ips.str());
  2391. cachehostip.ipset(ip);
  2392. return cachehostname.get();
  2393. }
  2394. }
  2395. #endif
  2396. char temp[1024];
  2397. if (gethostname(temp, sizeof(temp))==0)
  2398. cachehostname.set(temp);
  2399. else
  2400. cachehostname.set("localhost"); // assume no NIC card
  2401. }
  2402. return cachehostname.get();
  2403. }
  2404. IpAddress & queryLocalIP()
  2405. {
  2406. CriticalBlock c(hostnamesect);
  2407. if (localhostip.isNull())
  2408. if (IP6preferred)
  2409. localhostip.ipset("::1"); //IPv6
  2410. else
  2411. localhostip.ipset("127.0.0.1"); //IPv4
  2412. return localhostip;
  2413. }
  2414. IpAddress & queryHostIP()
  2415. {
  2416. CriticalBlock c(hostnamesect);
  2417. if (cachehostip.isNull()) {
  2418. if (!cachehostip.ipset(GetCachedHostName())) {
  2419. cachehostip.ipset(queryLocalIP());
  2420. printf("hostname %s not resolved, using localhost\n",GetCachedHostName()); // don't use jlog in case recursive
  2421. }
  2422. }
  2423. return cachehostip;
  2424. }
  2425. IpAddress &GetHostIp(IpAddress &ip)
  2426. {
  2427. ip.ipset(queryHostIP());
  2428. return ip;
  2429. }
  2430. IpAddress &localHostToNIC(IpAddress &ip)
  2431. {
  2432. if (ip.isLoopBack())
  2433. GetHostIp(ip);
  2434. return ip;
  2435. }
  2436. // IpAddress
  2437. inline bool isIp4(const unsigned *netaddr)
  2438. {
  2439. if (IP4only)
  2440. return true;
  2441. if (netaddr[2]==0xffff0000)
  2442. return (netaddr[1]==0)&&(netaddr[0]==0);
  2443. if (netaddr[2]==0)
  2444. if ((netaddr[3]==0)&&(netaddr[0]==0)&&(netaddr[1]==0))
  2445. return true; // null address
  2446. // maybe should get loopback here
  2447. return false;
  2448. }
  2449. bool IpAddress::isIp4() const
  2450. {
  2451. return ::isIp4(netaddr);
  2452. }
  2453. bool IpAddress::isNull() const
  2454. {
  2455. return (netaddr[3]==0)&&(IP4only||((netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0)));
  2456. }
  2457. bool IpAddress::isLoopBack() const
  2458. {
  2459. if (::isIp4(netaddr)&&((netaddr[3] & 0x000000ff)==0x000007f))
  2460. return true;
  2461. return (netaddr[3]==0x1000000)&&(netaddr[2]==0)&&(netaddr[1]==0)&&(netaddr[0]==0);
  2462. }
  2463. bool IpAddress::isLocal() const
  2464. {
  2465. if (isLoopBack() || isHost())
  2466. return true;
  2467. IpAddress ip(*this);
  2468. return isInterfaceIp(ip, NULL);
  2469. }
  2470. bool IpAddress::ipequals(const IpAddress & other) const
  2471. {
  2472. // reverse compare for speed
  2473. return (other.netaddr[3]==netaddr[3])&&(IP4only||((other.netaddr[2]==netaddr[2])&&(other.netaddr[1]==netaddr[1])&&(other.netaddr[0]==netaddr[0])));
  2474. }
  2475. int IpAddress::ipcompare(const IpAddress & other) const
  2476. {
  2477. return memcmp(&netaddr, &other.netaddr, sizeof(netaddr));
  2478. }
  2479. unsigned IpAddress::iphash(unsigned prev) const
  2480. {
  2481. return hashc((const byte *)&netaddr,sizeof(netaddr),prev);
  2482. }
  2483. bool IpAddress::isHost() const
  2484. {
  2485. return ipequals(queryHostIP());
  2486. }
  2487. static bool decodeNumericIP(const char *text,unsigned *netaddr)
  2488. {
  2489. if (!text)
  2490. return false;
  2491. bool isv6 = strchr(text,':')!=NULL;
  2492. StringBuffer tmp;
  2493. if ((*text=='[')&&!IP4only) {
  2494. text++;
  2495. size32_t l = strlen(text);
  2496. if ((l<=2)||(text[l-1]!=']'))
  2497. return false;
  2498. text = tmp.append(l-2,text);
  2499. }
  2500. if (!isv6&&isdigit(text[0])) {
  2501. if (_inet_pton(AF_INET, text, &netaddr[3])>0) {
  2502. netaddr[2] = netaddr[3]?0xffff0000:0; // check for NULL
  2503. netaddr[1] = 0;
  2504. netaddr[0] = 0; // special handling for loopback?
  2505. return true;
  2506. }
  2507. }
  2508. else if (isv6&&!IP4only) {
  2509. int ret = _inet_pton(AF_INET6, text, netaddr);
  2510. if (ret>=0)
  2511. return (ret>0);
  2512. int err = ERRNO();
  2513. StringBuffer tmp("_inet_pton: ");
  2514. tmp.append(text);
  2515. LOGERR(err,1,tmp.str());
  2516. }
  2517. return false;
  2518. }
  2519. static bool lookupHostAddress(const char *name,unsigned *netaddr)
  2520. {
  2521. // if IP4only or using MS V6 can only resolve IPv4 using
  2522. static bool recursioncheck = false; // needed to stop error message recursing
  2523. unsigned retry=10;
  2524. #if defined(__linux__) || defined(getaddrinfo)
  2525. if (IP4only) {
  2526. #else
  2527. {
  2528. #endif
  2529. CriticalBlock c(hostnamesect);
  2530. hostent * entry = gethostbyname(name);
  2531. while (entry==NULL) {
  2532. if (retry--==0) {
  2533. if (!recursioncheck) {
  2534. recursioncheck = true;
  2535. LogErr(h_errno,1,"gethostbyname failed",__LINE__,name);
  2536. recursioncheck = false;
  2537. }
  2538. return false;
  2539. }
  2540. {
  2541. CriticalUnblock ub(hostnamesect);
  2542. Sleep((10-retry)*100);
  2543. }
  2544. entry = gethostbyname(name);
  2545. }
  2546. if (entry->h_addr_list[0]) {
  2547. unsigned ptr = 0;
  2548. if (!PreferredSubnet.isNull()) {
  2549. loop {
  2550. ptr++;
  2551. if (entry->h_addr_list[ptr]==NULL) {
  2552. ptr = 0;
  2553. break;
  2554. }
  2555. IpAddress ip;
  2556. ip.setNetAddress(sizeof(unsigned),entry->h_addr_list[ptr]);
  2557. if (PreferredSubnet.test(ip))
  2558. break;
  2559. }
  2560. }
  2561. memcpy(&netaddr[3], entry->h_addr_list[ptr], sizeof(netaddr[3]));
  2562. netaddr[2] = 0xffff0000;
  2563. netaddr[1] = 0;
  2564. netaddr[0] = 0;
  2565. return true;
  2566. }
  2567. return false;
  2568. }
  2569. #if defined(__linux__) || defined(getaddrinfo)
  2570. struct addrinfo hints;
  2571. memset(&hints,0,sizeof(hints));
  2572. struct addrinfo *addrInfo = NULL;
  2573. loop {
  2574. memset(&hints,0,sizeof(hints));
  2575. int ret = getaddrinfo(name, NULL , &hints, &addrInfo);
  2576. if (!ret)
  2577. break;
  2578. if (retry--==0) {
  2579. if (!recursioncheck) {
  2580. recursioncheck = true;
  2581. LogErr(ret,1,"getaddrinfo failed",__LINE__,name);
  2582. #ifdef _DEBUG
  2583. PrintStackReport();
  2584. #endif
  2585. recursioncheck = false;
  2586. }
  2587. return false;
  2588. }
  2589. Sleep((10-retry)*100);
  2590. }
  2591. struct addrinfo *best = NULL;
  2592. bool snm = !PreferredSubnet.isNull();
  2593. loop {
  2594. struct addrinfo *ai;
  2595. for (ai = addrInfo; ai; ai = ai->ai_next) {
  2596. // printf("flags=%d, family=%d, socktype=%d, protocol=%d, addrlen=%d, canonname=%s\n",ai->ai_flags,ai->ai_family,ai->ai_socktype,ai->ai_protocol,ai->ai_addrlen,ai->ai_canonname?ai->ai_canonname:"NULL");
  2597. switch (ai->ai_family) {
  2598. case AF_INET: {
  2599. if (snm) {
  2600. IpAddress ip;
  2601. ip.setNetAddress(sizeof(in_addr),&(((sockaddr_in *)ai->ai_addr)->sin_addr));
  2602. if (!PreferredSubnet.test(ip))
  2603. continue;
  2604. }
  2605. if ((best==NULL)||((best->ai_family==AF_INET6)&&!IP6preferred))
  2606. best = ai;
  2607. break;
  2608. }
  2609. case AF_INET6: {
  2610. if (snm) {
  2611. IpAddress ip;
  2612. ip.setNetAddress(sizeof(in_addr6),&(((sockaddr_in6 *)ai->ai_addr)->sin6_addr));
  2613. if (!PreferredSubnet.test(ip))
  2614. continue;
  2615. }
  2616. if ((best==NULL)||((best->ai_family==AF_INET)&&IP6preferred))
  2617. best = ai;
  2618. break;
  2619. }
  2620. }
  2621. }
  2622. if (best||!snm)
  2623. break;
  2624. snm = false;
  2625. }
  2626. if (best) {
  2627. if (best->ai_family==AF_INET6)
  2628. memcpy(netaddr,&(((sockaddr_in6 *)best->ai_addr)->sin6_addr),sizeof(in6_addr));
  2629. else {
  2630. memcpy(netaddr+3,&(((sockaddr_in *)best->ai_addr)->sin_addr),sizeof(in_addr));
  2631. netaddr[2] = 0xffff0000;
  2632. netaddr[1] = 0;
  2633. netaddr[0] = 0;
  2634. }
  2635. }
  2636. freeaddrinfo(addrInfo);
  2637. return best!=NULL;
  2638. #endif
  2639. return false;
  2640. }
  2641. bool IpAddress::ipset(const char *text)
  2642. {
  2643. if (text&&*text) {
  2644. if ((text[0]=='.')&&(text[1]==0)) {
  2645. ipset(queryHostIP());
  2646. return true;
  2647. }
  2648. if (decodeNumericIP(text,netaddr))
  2649. return true;
  2650. const char *s;
  2651. for (s=text;*s;s++)
  2652. if (!isdigit(*s)&&(*s!=':')&&(*s!='.'))
  2653. break;
  2654. if (!*s)
  2655. return ipset(NULL);
  2656. if (lookupHostAddress(text,netaddr))
  2657. return true;
  2658. }
  2659. memset(&netaddr,0,sizeof(netaddr));
  2660. return false;
  2661. }
  2662. inline char * addbyte(char *s,byte b)
  2663. {
  2664. if (b>=100) {
  2665. *(s++) = b/100+'0';
  2666. b %= 100;
  2667. *(s++) = b/10+'0';
  2668. b %= 10;
  2669. }
  2670. else if (b>=10) {
  2671. *(s++) = b/10+'0';
  2672. b %= 10;
  2673. }
  2674. *(s++) = b+'0';
  2675. return s;
  2676. }
  2677. StringBuffer & IpAddress::getIpText(StringBuffer & out) const
  2678. {
  2679. if (::isIp4(netaddr)) {
  2680. const byte *ip = (const byte *)&netaddr[3];
  2681. char ips[16];
  2682. char *s = ips;
  2683. for (unsigned i=0;i<4;i++) {
  2684. if (i)
  2685. *(s++) = '.';
  2686. s = addbyte(s,ip[i]);
  2687. }
  2688. return out.append(s-ips,ips);
  2689. }
  2690. char tmp[INET6_ADDRSTRLEN];
  2691. const char *res = _inet_ntop(AF_INET6, &netaddr, tmp, sizeof(tmp));
  2692. if (!res)
  2693. throw MakeOsException(errno);
  2694. return out.append(res);
  2695. }
  2696. void IpAddress::ipserialize(MemoryBuffer & out) const
  2697. {
  2698. if (((netaddr[2]==0xffff0000)||(netaddr[2]==0))&&(netaddr[1]==0)&&(netaddr[0]==0)) {
  2699. if (netaddr[3]==IPV6_SERIALIZE_PREFIX)
  2700. throw MakeStringException(-1,"Invalid network address"); // hack prevention
  2701. out.append(sizeof(netaddr[3]), &netaddr[3]);
  2702. }
  2703. else {
  2704. unsigned pfx = IPV6_SERIALIZE_PREFIX;
  2705. out.append(sizeof(pfx),&pfx).append(sizeof(netaddr),&netaddr);
  2706. }
  2707. }
  2708. void IpAddress::ipdeserialize(MemoryBuffer & in)
  2709. {
  2710. unsigned pfx;
  2711. in.read(sizeof(pfx),&pfx);
  2712. if (pfx!=IPV6_SERIALIZE_PREFIX) {
  2713. netaddr[0] = 0;
  2714. netaddr[1] = 0;
  2715. netaddr[2] = (pfx == 0 || pfx == 0x1000000) ? 0 : 0xffff0000; // catch null and loopback
  2716. netaddr[3] = pfx;
  2717. }
  2718. else
  2719. in.read(sizeof(netaddr),&netaddr);
  2720. }
  2721. unsigned IpAddress::ipdistance(const IpAddress &other,unsigned offset) const
  2722. {
  2723. if (offset>3)
  2724. offset = 3;
  2725. int i1;
  2726. _cpyrev4(&i1,&netaddr[3-offset]);
  2727. int i2;
  2728. _cpyrev4(&i2,&other.netaddr[3-offset]);
  2729. i1-=i2;
  2730. if (i1>0)
  2731. return i1;
  2732. return -i1;
  2733. }
  2734. bool IpAddress::ipincrement(unsigned count,byte minoctet,byte maxoctet,unsigned short minipv6piece,unsigned maxipv6piece)
  2735. {
  2736. unsigned base;
  2737. if (::isIp4(netaddr)) {
  2738. base = maxoctet-minoctet+1;
  2739. if (!base||(base>256))
  2740. return false;
  2741. byte * ips = (byte *)&netaddr[3];
  2742. byte * ip = ips+4;
  2743. while (count) {
  2744. if (ip==ips)
  2745. return false; // overflow
  2746. ip--;
  2747. unsigned v = (count+((*ip>minoctet)?(*ip-minoctet):0));
  2748. *ip = minoctet + v%base;
  2749. count = v/base;
  2750. }
  2751. }
  2752. else {
  2753. base = maxipv6piece-minipv6piece+1;
  2754. if (!base||(base>0x10000))
  2755. return false;
  2756. unsigned short * ps = (unsigned short *)&netaddr;
  2757. unsigned short * p = ps+8;
  2758. while (count) {
  2759. if (p==ps)
  2760. return false; // overflow (actually near impossible!)
  2761. p--;
  2762. unsigned v = (count+((*p>minipv6piece)?(*p-minipv6piece):0));
  2763. *p = minipv6piece + v%base;
  2764. count = v/base;
  2765. }
  2766. }
  2767. return true;
  2768. }
  2769. unsigned IpAddress::ipsetrange( const char *text) // e.g. 10.173.72.1-65 ('-' may be omitted)
  2770. {
  2771. unsigned e=0;
  2772. unsigned f=0;
  2773. const char *r = strchr(text,'-');
  2774. bool ok;
  2775. if (r) {
  2776. e = atoi(r+1);
  2777. StringBuffer tmp(r-text,text);
  2778. ok = ipset(tmp.str());
  2779. if (!::isIp4(netaddr))
  2780. IPV6_NOT_IMPLEMENTED(); // TBD IPv6
  2781. if (ok) {
  2782. while ((r!=text)&&(*(r-1)!='.'))
  2783. r--;
  2784. f = (r!=text)?atoi(r):0;
  2785. }
  2786. }
  2787. else
  2788. ok = ipset(text);
  2789. if ((f>e)||!ok)
  2790. return 0;
  2791. return e-f+1;
  2792. }
  2793. size32_t IpAddress::getNetAddress(size32_t maxsz,void *dst) const
  2794. {
  2795. if (maxsz==sizeof(unsigned)) {
  2796. if (::isIp4(netaddr)) {
  2797. *(unsigned *)dst = netaddr[3];
  2798. return maxsz;
  2799. }
  2800. }
  2801. else if (!IP4only&&(maxsz==sizeof(netaddr))) {
  2802. memcpy(dst,&netaddr,maxsz);
  2803. return maxsz;
  2804. }
  2805. return 0;
  2806. }
  2807. void IpAddress::setNetAddress(size32_t sz,const void *src)
  2808. {
  2809. if (sz==sizeof(unsigned)) { // IPv4
  2810. netaddr[0] = 0;
  2811. netaddr[1] = 0;
  2812. netaddr[2]=0xffff0000;
  2813. netaddr[3] = *(const unsigned *)src;
  2814. }
  2815. else if (!IP4only&&(sz==sizeof(netaddr))) { // IPv6
  2816. memcpy(&netaddr,src,sz);
  2817. if ((netaddr[2]==0)&&(netaddr[3]!=0)&&(netaddr[3]!=0x1000000)&&(netaddr[0]==0)&&(netaddr[1]==0))
  2818. netaddr[2]=0xffff0000; // use this form only
  2819. }
  2820. else
  2821. memset(&netaddr,0,sizeof(netaddr));
  2822. }
  2823. void SocketEndpoint::deserialize(MemoryBuffer & in)
  2824. {
  2825. ipdeserialize(in);
  2826. in.read(port);
  2827. }
  2828. void SocketEndpoint::serialize(MemoryBuffer & out) const
  2829. {
  2830. ipserialize(out);
  2831. out.append(port);
  2832. }
  2833. bool SocketEndpoint::set(const char *name,unsigned short _port)
  2834. {
  2835. if (name) {
  2836. if (*name=='[') {
  2837. const char *s = name+1;
  2838. const char *t = strchr(s,']');
  2839. if (t) {
  2840. StringBuffer tmp(t-s,s);
  2841. if (t[1]==':')
  2842. _port = atoi(t+2);
  2843. return set(tmp.str(),_port);
  2844. }
  2845. }
  2846. const char * colon = strchr(name, ':');
  2847. if (colon) {
  2848. if (!IP4only&&strchr(colon+1, ':'))
  2849. colon = NULL; // hello its IpV6
  2850. }
  2851. else
  2852. colon = strchr(name, '|'); // strange hole convention
  2853. char ips[260];
  2854. if (colon) {
  2855. size32_t l = colon-name;
  2856. if (l>=sizeof(ips))
  2857. l = sizeof(ips)-1;
  2858. memcpy(ips,name,l);
  2859. ips[l] = 0;
  2860. name = ips;
  2861. _port = atoi(colon+1);
  2862. }
  2863. if (ipset(name)) {
  2864. port = _port;
  2865. return true;
  2866. }
  2867. }
  2868. ipset(NULL);
  2869. port = 0;
  2870. return false;
  2871. }
  2872. void SocketEndpoint::getUrlStr(char * str, size32_t len) const
  2873. {
  2874. if (len==0)
  2875. return;
  2876. StringBuffer _str;
  2877. getUrlStr(_str);
  2878. size32_t l = _str.length()+1;
  2879. if (l>len)
  2880. {
  2881. l = len-1;
  2882. str[l] = 0;
  2883. }
  2884. memcpy(str,_str.toCharArray(),l);
  2885. }
  2886. StringBuffer &SocketEndpoint::getUrlStr(StringBuffer &str) const
  2887. {
  2888. getIpText(str);
  2889. if (port)
  2890. str.append(':').append((unsigned)port); // TBD IPv6 put [] on
  2891. return str;
  2892. }
  2893. unsigned SocketEndpoint::hash(unsigned prev) const
  2894. {
  2895. return hashc((const byte *)&port,sizeof(port),iphash(prev));
  2896. }
  2897. //---------------------------------------------------------------------------
  2898. SocketListCreator::SocketListCreator()
  2899. {
  2900. lastPort = 0;
  2901. }
  2902. void SocketListCreator::addSocket(const SocketEndpoint &ep)
  2903. {
  2904. StringBuffer ipstr;
  2905. ep.getIpText(ipstr);
  2906. addSocket(ipstr.str(), ep.port);
  2907. }
  2908. void SocketListCreator::addSocket(const char * ip, unsigned port)
  2909. {
  2910. if (fullText.length())
  2911. fullText.append("|");
  2912. const char * prev = lastIp;
  2913. const char * startCopy = ip;
  2914. if (prev)
  2915. {
  2916. if (strcmp(ip, prev) == 0)
  2917. {
  2918. fullText.append("=");
  2919. startCopy = NULL;
  2920. }
  2921. else
  2922. {
  2923. const char * cur = ip;
  2924. loop
  2925. {
  2926. char n = *cur;
  2927. if (!n)
  2928. break;
  2929. if (n != *prev)
  2930. break;
  2931. cur++;
  2932. prev++;
  2933. if (n == '.')
  2934. startCopy = cur;
  2935. }
  2936. if (startCopy != ip)
  2937. fullText.append("*");
  2938. }
  2939. }
  2940. fullText.append(startCopy);
  2941. if (lastPort != port)
  2942. fullText.append(":").append(port);
  2943. lastIp.set(ip);
  2944. lastPort = port;
  2945. }
  2946. const char * SocketListCreator::getText()
  2947. {
  2948. return fullText.str();
  2949. }
  2950. void SocketListCreator::addSockets(SocketEndpointArray &array)
  2951. {
  2952. ForEachItemIn(i,array) {
  2953. SocketEndpoint &sockep=array.item(i);
  2954. StringBuffer ipstr;
  2955. sockep.getIpText(ipstr);
  2956. addSocket(ipstr.str(),sockep.port);
  2957. }
  2958. }
  2959. //---------------------------------------------------------------------------
  2960. SocketListParser::SocketListParser(const char * text)
  2961. {
  2962. fullText.set(text);
  2963. cursor = NULL;
  2964. lastPort = 0;
  2965. }
  2966. void SocketListParser::first(unsigned port)
  2967. {
  2968. cursor = fullText;
  2969. lastIp.set(NULL);
  2970. lastPort = port;
  2971. }
  2972. bool SocketListParser::get(StringAttr & ip, unsigned & port, unsigned index, unsigned defport)
  2973. {
  2974. first(defport);
  2975. do
  2976. {
  2977. if (!next(ip, port))
  2978. return false;
  2979. } while (index--);
  2980. return true;
  2981. }
  2982. bool SocketListParser::next(StringAttr & ip, unsigned & port)
  2983. {
  2984. // IPV6TBD
  2985. StringBuffer ipText;
  2986. if (*cursor == 0)
  2987. return false;
  2988. if (*cursor == '=')
  2989. {
  2990. ipText.append(lastIp);
  2991. cursor++;
  2992. }
  2993. else if (*cursor == '*')
  2994. {
  2995. cursor++;
  2996. //count the number of dots in the tail
  2997. const char * cur = cursor;
  2998. unsigned count = 0;
  2999. loop
  3000. {
  3001. char c = *cur++;
  3002. switch (c)
  3003. {
  3004. case 0:
  3005. case '|':
  3006. case ',':
  3007. case ':':
  3008. goto done;
  3009. case '.':
  3010. ++count;
  3011. break;
  3012. }
  3013. }
  3014. done:
  3015. //copy up to the appropriate dot from the previous ip.
  3016. const unsigned dotCount = 3; //more what about 6 digit ip's
  3017. cur = lastIp;
  3018. loop
  3019. {
  3020. char c = *cur++;
  3021. switch (c)
  3022. {
  3023. case 0:
  3024. case '|':
  3025. case ',':
  3026. case ':':
  3027. assertex(!"Should not get here!");
  3028. goto done2;
  3029. case '.':
  3030. ipText.append(c);
  3031. if (++count == dotCount)
  3032. goto done2;
  3033. break;
  3034. default:
  3035. ipText.append(c);
  3036. break;
  3037. }
  3038. }
  3039. done2:;
  3040. }
  3041. bool inPort = false;
  3042. port = lastPort;
  3043. loop
  3044. {
  3045. char c = *cursor++;
  3046. switch (c)
  3047. {
  3048. case 0:
  3049. cursor--;
  3050. goto doneCopy;
  3051. case '|':
  3052. case ',':
  3053. goto doneCopy;
  3054. case ':':
  3055. port = atoi(cursor);
  3056. inPort = true;
  3057. break;;
  3058. default:
  3059. if (!inPort)
  3060. ipText.append(c);
  3061. break;
  3062. }
  3063. }
  3064. doneCopy:
  3065. lastIp.set(ipText.str());
  3066. ip.set(lastIp);
  3067. lastPort = port;
  3068. return true;
  3069. }
  3070. unsigned SocketListParser::getSockets(SocketEndpointArray &array,unsigned defport)
  3071. {
  3072. first(defport);
  3073. StringAttr ip;
  3074. unsigned port;
  3075. while (next(ip,port)) {
  3076. SocketEndpoint ep(ip,port);
  3077. array.append(ep);
  3078. }
  3079. return array.ordinality();
  3080. }
  3081. void getSocketStatistics(JSocketStatistics &stats)
  3082. {
  3083. // should put in simple lock
  3084. memcpy(&stats,&STATS,sizeof(stats));
  3085. }
  3086. void resetSocketStatistics()
  3087. {
  3088. unsigned activesockets=STATS.activesockets;
  3089. memset(&STATS,0,sizeof(STATS));
  3090. STATS.activesockets = activesockets;
  3091. }
  3092. static StringBuffer &appendtime(StringBuffer &s,unsigned us)
  3093. {
  3094. // attemp to get into more sensible units
  3095. if (us>10000000)
  3096. return s.append(us/1000000).append('s');
  3097. if (us>10000)
  3098. return s.append(us/1000).append("ms");
  3099. return s.append(us).append("us");
  3100. }
  3101. StringBuffer &getSocketStatisticsString(JSocketStatistics &stats,StringBuffer &str)
  3102. {
  3103. str.append("connects=").append(stats.connects).append('\n');
  3104. appendtime(str.append("connecttime="),stats.connecttime).append('\n');
  3105. str.append("failedconnects=").append(stats.failedconnects).append('\n');
  3106. appendtime(str.append("failedconnecttime="),stats.failedconnecttime).append('\n');
  3107. str.append("reads=").append(stats.reads).append('\n');
  3108. appendtime(str.append("readtime="),stats.readtime).append('\n');
  3109. str.append("readsize=").append(stats.readsize).append(" bytes\n");
  3110. str.append("writes=").append(stats.writes).append('\n');
  3111. appendtime(str.append("writetime="),stats.writetime).append('\n');
  3112. str.append("writesize=").append(stats.writesize).append(" bytes").append('\n');
  3113. str.append("activesockets=").append(stats.activesockets).append('\n');
  3114. str.append("numblockrecvs=").append(stats.numblockrecvs).append('\n');
  3115. str.append("numblocksends=").append(stats.numblocksends).append('\n');
  3116. str.append("blockrecvsize=").append(stats.blockrecvsize).append('\n');
  3117. str.append("blocksendsize=").append(stats.blocksendsize).append('\n');
  3118. str.append("blockrecvtime=").append(stats.blockrecvtime).append('\n');
  3119. str.append("blocksendtime=").append(stats.blocksendtime).append('\n');
  3120. str.append("longestblocksend=").append(stats.longestblocksend).append('\n');
  3121. str.append("longestblocksize=").append(stats.longestblocksize);
  3122. return str;
  3123. }
  3124. // ===============================================================================
  3125. // select thread for handling multiple selects
  3126. struct SelectItem
  3127. {
  3128. ISocket *sock;
  3129. T_SOCKET handle;
  3130. ISocketSelectNotify *nfy;
  3131. byte mode;
  3132. bool del;
  3133. bool add_epoll;
  3134. };
  3135. inline SelectItem &Array__Member2Param(SelectItem &src) { return src; }
  3136. inline void Array__Assign(SelectItem & dest, SelectItem &src) { dest=src; }
  3137. inline bool Array__Equal(SelectItem &m, SelectItem &p) { return m.sock==p.sock; }
  3138. inline void Array__Destroy(SelectItem &p) { }
  3139. class SelectItemArray : public ArrayOf<SelectItem, SelectItem &> { };
  3140. #define SELECT_TIMEOUT_SECS 1 // but it does (TBD)
  3141. #ifdef _WIN32
  3142. // fd_set utility functions
  3143. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3144. {
  3145. unsigned i = src.fd_count;
  3146. dst.fd_count = i;
  3147. while (i--)
  3148. dst.fd_array[i] = src.fd_array[i]; // possibly better as memcpy
  3149. return &dst;
  3150. }
  3151. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3152. {
  3153. unsigned n = s.fd_count;
  3154. unsigned i;
  3155. for(i=0;i<n;i++) {
  3156. if (s.fd_array[i] == h) {
  3157. if (--n)
  3158. s.fd_array[i] = s.fd_array[n]; // remove item
  3159. else
  3160. c = false;
  3161. s.fd_count = n;
  3162. return true;
  3163. }
  3164. }
  3165. return false;
  3166. }
  3167. inline T_SOCKET popfds(T_FD_SET &s)
  3168. {
  3169. unsigned n = s.fd_count;
  3170. T_SOCKET ret;
  3171. if (n) {
  3172. ret = s.fd_array[--n];
  3173. s.fd_count = n;
  3174. }
  3175. else
  3176. ret = NULL;
  3177. return ret;
  3178. }
  3179. #else
  3180. #define _USE_PIPE_FOR_SELECT_TRIGGER
  3181. // not as optimized as windows but I am expecting to convert to using poll anyway
  3182. inline T_FD_SET *cpyfds(T_FD_SET &dst,const T_FD_SET &src)
  3183. {
  3184. memcpy(&dst,&src,sizeof(T_FD_SET));
  3185. return &dst;
  3186. }
  3187. inline bool findfds(T_FD_SET &s,T_SOCKET h,bool &c)
  3188. {
  3189. if ((unsigned)h>=XFD_SETSIZE)
  3190. return false;
  3191. return FD_ISSET(h,&s); // does not remove entry or set termination flag when done
  3192. }
  3193. #endif
  3194. class CSocketBaseThread: public Thread
  3195. {
  3196. protected:
  3197. bool terminating;
  3198. CriticalSection sect;
  3199. Semaphore ticksem;
  3200. atomic_t tickwait;
  3201. SelectItemArray items;
  3202. unsigned offset;
  3203. bool selectvarschange;
  3204. unsigned waitingchange;
  3205. Semaphore waitingchangesem;
  3206. int validateselecterror;
  3207. unsigned validateerrcount;
  3208. const char *selecttrace;
  3209. unsigned basesize;
  3210. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3211. T_SOCKET dummysock[2];
  3212. #else
  3213. T_SOCKET dummysock;
  3214. #endif
  3215. bool dummysockopen;
  3216. CSocketBaseThread(const char *trc) : Thread("CSocketBaseThread")
  3217. {
  3218. }
  3219. ~CSocketBaseThread()
  3220. {
  3221. }
  3222. public:
  3223. void triggerselect()
  3224. {
  3225. if (atomic_read(&tickwait))
  3226. ticksem.signal();
  3227. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3228. CriticalBlock block(sect);
  3229. char c = 0;
  3230. if(write(dummysock[1], &c, 1) != 1) {
  3231. int err = ERRNO();
  3232. LOGERR(err,1,"Socket closed during trigger select");
  3233. }
  3234. #else
  3235. closedummy();
  3236. #endif
  3237. }
  3238. void resettrigger()
  3239. {
  3240. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3241. CriticalBlock block(sect);
  3242. char c;
  3243. while((::read(dummysock[0], &c, sizeof(c))) == sizeof(c));
  3244. #endif
  3245. }
  3246. bool remove(ISocket *sock)
  3247. {
  3248. if (terminating)
  3249. return false;
  3250. CriticalBlock block(sect);
  3251. if (sock==NULL) { // wait until no changes outstanding
  3252. while (selectvarschange) {
  3253. waitingchange++;
  3254. CriticalUnblock unblock(sect);
  3255. waitingchangesem.wait();
  3256. }
  3257. return true;
  3258. }
  3259. ForEachItemIn(i,items) {
  3260. SelectItem &si = items.item(i);
  3261. if (!si.del&&(si.sock==sock)) {
  3262. si.del = true;
  3263. selectvarschange = true;
  3264. triggerselect();
  3265. return true;
  3266. }
  3267. }
  3268. return false;
  3269. }
  3270. void stop(bool wait)
  3271. {
  3272. terminating = true;
  3273. triggerselect();
  3274. if (wait)
  3275. join();
  3276. }
  3277. bool sockOk(T_SOCKET sock)
  3278. {
  3279. PROGLOG("CSocketBaseThread: sockOk testing %d",sock);
  3280. int err = 0;
  3281. int t=0;
  3282. socklen_t tl = sizeof(t);
  3283. if (getsockopt(sock, SOL_SOCKET, SO_TYPE, (char *)&t, &tl)!=0) {
  3284. StringBuffer sockstr;
  3285. const char *tracename = sockstr.append((unsigned)sock).str();
  3286. LOGERR2(ERRNO(),1,"CSocketBaseThread select handle");
  3287. return false;
  3288. }
  3289. T_FD_SET fds;
  3290. struct timeval tv;
  3291. XFD_ZERO(&fds);
  3292. FD_SET((unsigned)sock, &fds);
  3293. //FD_SET((unsigned)sock, &except);
  3294. tv.tv_sec = 0;
  3295. tv.tv_usec = 0;
  3296. CHECKSOCKRANGE(sock);
  3297. int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
  3298. if (rc<0) {
  3299. StringBuffer sockstr;
  3300. const char *tracename = sockstr.append((unsigned)sock).str();
  3301. LOGERR2(ERRNO(),2,"CSocketBaseThread select handle");
  3302. return false;
  3303. }
  3304. else if (rc>0)
  3305. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3306. XFD_ZERO(&fds);
  3307. FD_SET((unsigned)sock, &fds);
  3308. tv.tv_sec = 0;
  3309. tv.tv_usec = 0;
  3310. rc = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, &tv );
  3311. if (rc<0) {
  3312. StringBuffer sockstr;
  3313. const char *tracename = sockstr.append((unsigned)sock).str();
  3314. LOGERR2(ERRNO(),3,"CSocketBaseThread select handle");
  3315. return false;
  3316. }
  3317. else if (rc>0)
  3318. PROGLOG("CSocketBaseThread: select handle %d selected(2) %d",sock,rc);
  3319. return true;
  3320. }
  3321. bool checkSocks()
  3322. {
  3323. bool ret = false;
  3324. ForEachItemIn(i,items) {
  3325. SelectItem &si = items.item(i);
  3326. if (si.del)
  3327. ret = true; // maybe that bad one
  3328. else if (!sockOk(si.handle)) {
  3329. si.del = true;
  3330. ret = true;
  3331. }
  3332. }
  3333. return ret;
  3334. }
  3335. };
  3336. class CSocketSelectThread: public CSocketBaseThread
  3337. {
  3338. void opendummy()
  3339. {
  3340. CriticalBlock block(sect);
  3341. if (!dummysockopen) {
  3342. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3343. if(pipe(dummysock)) {
  3344. WARNLOG("CSocketSelectThread: create pipe failed %d",ERRNO());
  3345. return;
  3346. }
  3347. for (unsigned i=0;i<2;i++) {
  3348. int flags = fcntl(dummysock[i], F_GETFL, 0);
  3349. if (flags!=-1) {
  3350. flags |= O_NONBLOCK;
  3351. fcntl(dummysock[i], F_SETFL, flags);
  3352. }
  3353. flags = fcntl(dummysock[i], F_GETFD, 0);
  3354. if (flags!=-1) {
  3355. flags |= FD_CLOEXEC;
  3356. fcntl(dummysock[i], F_SETFD, flags);
  3357. }
  3358. }
  3359. CHECKSOCKRANGE(dummysock[0]);
  3360. #else
  3361. if (IP6preferred)
  3362. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  3363. else
  3364. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  3365. CHECKSOCKRANGE(dummysock);
  3366. #endif
  3367. dummysockopen = true;
  3368. }
  3369. }
  3370. void closedummy()
  3371. {
  3372. CriticalBlock block(sect);
  3373. if (dummysockopen) {
  3374. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3375. #ifdef SOCKTRACE
  3376. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%x)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  3377. #endif
  3378. ::close(dummysock[0]);
  3379. ::close(dummysock[1]);
  3380. #else
  3381. #ifdef _WIN32
  3382. ::closesocket(dummysock);
  3383. #else
  3384. ::close(dummysock);
  3385. #endif
  3386. #endif
  3387. dummysockopen = false;
  3388. }
  3389. }
  3390. #ifdef _WIN32
  3391. #define HASHTABSIZE 256
  3392. #define HASHNULL (HASHTABSIZE-1)
  3393. #define HASHTABMASK (HASHTABSIZE-1)
  3394. byte hashtab[HASHTABSIZE];
  3395. #define HASHSOCKET(s) ((((unsigned)s)>>2)&HASHTABMASK) // with some knowledge of windows handles
  3396. void inithash()
  3397. {
  3398. memset(&hashtab,HASHNULL,sizeof(hashtab));
  3399. assertex(FD_SETSIZE<255);
  3400. }
  3401. void reinithash()
  3402. { // done this way because index of items changes and hash table not that big
  3403. inithash();
  3404. assertex(items.ordinality()<HASHTABSIZE-1);
  3405. ForEachItemIn(i,items) {
  3406. unsigned h = HASHSOCKET(items.item(i).handle);
  3407. loop {
  3408. if (hashtab[h]==HASHNULL) {
  3409. hashtab[h] = (byte)i;
  3410. break;
  3411. }
  3412. if (++h==HASHTABSIZE)
  3413. h = 0;
  3414. }
  3415. }
  3416. }
  3417. inline SelectItem &findhash(T_SOCKET handle)
  3418. {
  3419. unsigned h = HASHSOCKET(handle);
  3420. unsigned sh = h;
  3421. loop {
  3422. SelectItem &i=items.item(hashtab[h]);
  3423. if (i.handle==handle)
  3424. return i;
  3425. if (++h==HASHTABSIZE)
  3426. h = 0;
  3427. assertex(h!=sh);
  3428. }
  3429. }
  3430. inline void processfds(T_FD_SET &s,byte mode,SelectItemArray &tonotify)
  3431. {
  3432. loop {
  3433. T_SOCKET sock = popfds(s);
  3434. if (!sock)
  3435. break;
  3436. if (sock!=dummysock) {
  3437. SelectItem si = findhash(sock); // nb copies
  3438. if (!si.del) {
  3439. si.mode = mode;
  3440. tonotify.append(si);
  3441. }
  3442. }
  3443. }
  3444. }
  3445. #endif
  3446. public:
  3447. IMPLEMENT_IINTERFACE;
  3448. CSocketSelectThread(const char *trc)
  3449. : CSocketBaseThread("CSocketSelectThread")
  3450. {
  3451. dummysockopen = false;
  3452. opendummy();
  3453. terminating = false;
  3454. atomic_set(&tickwait,0);
  3455. waitingchange = 0;
  3456. selectvarschange = false;
  3457. validateselecterror = 0;
  3458. validateerrcount = 0;
  3459. offset = 0;
  3460. selecttrace = trc;
  3461. basesize = 0;
  3462. #ifdef _WIN32
  3463. inithash();
  3464. #endif
  3465. }
  3466. ~CSocketSelectThread()
  3467. {
  3468. closedummy();
  3469. ForEachItemIn(i,items) {
  3470. try {
  3471. SelectItem &si = items.item(i);
  3472. si.sock->Release();
  3473. si.nfy->Release();
  3474. }
  3475. catch (IException *e) {
  3476. EXCLOG(e,"~CSocketSelectThread");
  3477. e->Release();
  3478. }
  3479. }
  3480. }
  3481. Owned<IException> termexcept;
  3482. void updateItems()
  3483. {
  3484. // must be in CriticalBlock block(sect);
  3485. unsigned n = items.ordinality();
  3486. bool hashupdateneeded = (n!=basesize); // additions all come at end
  3487. for (unsigned i=0;i<n;) {
  3488. SelectItem &si = items.item(i);
  3489. if (si.del) {
  3490. si.nfy->Release();
  3491. try {
  3492. #ifdef SOCKTRACE
  3493. PROGLOG("CSocketSelectThread::updateItems release %d",si.handle);
  3494. #endif
  3495. si.sock->Release();
  3496. }
  3497. catch (IException *e) {
  3498. EXCLOG(e,"CSocketSelectThread::updateItems");
  3499. e->Release();
  3500. }
  3501. n--;
  3502. if (i<n)
  3503. si = items.item(n);
  3504. items.remove(n);
  3505. hashupdateneeded = true;
  3506. }
  3507. else
  3508. i++;
  3509. }
  3510. assertex(n<=XFD_SETSIZE-1);
  3511. #ifdef _WIN32
  3512. if (hashupdateneeded)
  3513. reinithash();
  3514. #endif
  3515. basesize = n;
  3516. }
  3517. bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  3518. {
  3519. // maybe check once to prevent 1st delay? TBD
  3520. CriticalBlock block(sect);
  3521. unsigned n=0;
  3522. ForEachItemIn(i,items) {
  3523. SelectItem &si = items.item(i);
  3524. if (!si.del) {
  3525. if (si.sock==sock) {
  3526. si.del = true;
  3527. }
  3528. else
  3529. n++;
  3530. }
  3531. }
  3532. if (n>=XFD_SETSIZE-1) // leave 1 spare
  3533. return false;
  3534. SelectItem sn;
  3535. sn.nfy = LINK(nfy);
  3536. sn.sock = LINK(sock);
  3537. sn.mode = (byte)mode;
  3538. sn.handle = (T_SOCKET)sock->OShandle();
  3539. CHECKSOCKRANGE(sn.handle);
  3540. sn.del = false;
  3541. sn.add_epoll = false;
  3542. items.append(sn);
  3543. selectvarschange = true;
  3544. triggerselect();
  3545. return true;
  3546. }
  3547. void updateSelectVars(T_FD_SET &rdfds,T_FD_SET &wrfds,T_FD_SET &exfds,bool &isrd,bool &iswr,bool &isex,unsigned &ni,T_SOCKET &max_sockid)
  3548. {
  3549. CriticalBlock block(sect);
  3550. selectvarschange = false;
  3551. if (waitingchange) {
  3552. waitingchangesem.signal(waitingchange);
  3553. waitingchange = 0;
  3554. }
  3555. if (validateselecterror) { // something went wrong so check sockets
  3556. validateerrcount++;
  3557. if (!checkSocks()) {
  3558. // bad socket not found
  3559. PROGLOG("CSocketSelectThread::updateSelectVars cannot find socket error");
  3560. if (validateerrcount>10)
  3561. throw MakeStringException(-1,"CSocketSelectThread:Socket select error %d",validateselecterror);
  3562. }
  3563. }
  3564. else
  3565. validateerrcount = 0;
  3566. updateItems();
  3567. XFD_ZERO( &rdfds );
  3568. XFD_ZERO( &wrfds );
  3569. XFD_ZERO( &exfds );
  3570. isrd=false;
  3571. iswr=false;
  3572. isex=false;
  3573. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3574. max_sockid=dummysockopen?dummysock[0]:0;
  3575. #else
  3576. opendummy();
  3577. max_sockid=dummysockopen?dummysock:0;
  3578. #endif
  3579. ni = items.ordinality();
  3580. #ifdef _WIN32
  3581. if (offset>=ni)
  3582. #endif
  3583. offset = 0;
  3584. unsigned j=offset;
  3585. ForEachItemIn(i,items) {
  3586. SelectItem &si = items.item(j);
  3587. j++;
  3588. if (j==ni)
  3589. j = 0;
  3590. if (si.mode & SELECTMODE_READ) {
  3591. FD_SET( si.handle, &rdfds );
  3592. isrd = true;
  3593. }
  3594. if (si.mode & SELECTMODE_WRITE) {
  3595. FD_SET( si.handle, &wrfds );
  3596. iswr = true;
  3597. }
  3598. if (si.mode & SELECTMODE_EXCEPT) {
  3599. FD_SET( si.handle, &exfds );
  3600. isex = true;
  3601. }
  3602. max_sockid=std::max(si.handle, max_sockid);
  3603. }
  3604. if (dummysockopen) {
  3605. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3606. FD_SET( dummysock[0], &rdfds );
  3607. isrd = true;
  3608. #else
  3609. FD_SET( dummysock, &exfds );
  3610. isex = true;
  3611. #endif
  3612. }
  3613. validateselecterror = 0;
  3614. max_sockid++;
  3615. #ifdef SOCKTRACE
  3616. PROGLOG("SOCKTRACE: selecting on %d sockets",ni);
  3617. #endif
  3618. }
  3619. int run()
  3620. {
  3621. try {
  3622. T_FD_SET rdfds;
  3623. T_FD_SET wrfds;
  3624. T_FD_SET exfds;
  3625. timeval selecttimeout;
  3626. bool isrd = false;
  3627. bool iswr = false;
  3628. bool isex = false;
  3629. T_SOCKET maxsockid = 0;
  3630. unsigned ni = 0;
  3631. selectvarschange = true;
  3632. unsigned numto = 0;
  3633. unsigned lastnumto = 0;
  3634. unsigned totnum = 0;
  3635. unsigned total = 0;
  3636. while (!terminating) {
  3637. selecttimeout.tv_sec = SELECT_TIMEOUT_SECS; // linux modifies so initialize inside loop
  3638. selecttimeout.tv_usec = 0;
  3639. if (selectvarschange) {
  3640. updateSelectVars(rdfds,wrfds,exfds,isrd,iswr,isex,ni,maxsockid);
  3641. }
  3642. if (ni==0) {
  3643. validateerrcount = 0;
  3644. atomic_inc(&tickwait);
  3645. if(!selectvarschange&&!terminating)
  3646. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  3647. atomic_dec(&tickwait);
  3648. continue;
  3649. }
  3650. T_FD_SET rs;
  3651. T_FD_SET ws;
  3652. T_FD_SET es;
  3653. T_FD_SET *rsp = isrd?cpyfds(rs,rdfds):NULL;
  3654. T_FD_SET *wsp = iswr?cpyfds(ws,wrfds):NULL;
  3655. T_FD_SET *esp = isex?cpyfds(es,exfds):NULL;
  3656. int n = ::select(maxsockid,(fd_set *)rsp,(fd_set *)wsp,(fd_set *)esp,&selecttimeout); // first parameter needed for posix
  3657. if (terminating)
  3658. break;
  3659. if (n < 0) {
  3660. CriticalBlock block(sect);
  3661. int err = ERRNO();
  3662. if (err != EINTRCALL) {
  3663. if (dummysockopen) {
  3664. LOGERR(err,12,"CSocketSelectThread select error"); // should cache error ?
  3665. validateselecterror = err;
  3666. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  3667. closedummy(); // just in case was culprit
  3668. #endif
  3669. }
  3670. selectvarschange = true;
  3671. continue;
  3672. }
  3673. n = 0;
  3674. }
  3675. else if (n>0) {
  3676. validateerrcount = 0;
  3677. numto = 0;
  3678. lastnumto = 0;
  3679. total += n;
  3680. totnum++;
  3681. SelectItemArray tonotify;
  3682. {
  3683. CriticalBlock block(sect);
  3684. #ifdef _WIN32
  3685. if (isrd)
  3686. processfds(rs,SELECTMODE_READ,tonotify);
  3687. if (iswr)
  3688. processfds(ws,SELECTMODE_WRITE,tonotify);
  3689. if (isex)
  3690. processfds(es,SELECTMODE_EXCEPT,tonotify);
  3691. #else
  3692. unsigned i;
  3693. SelectItem *si = items.getArray(offset);
  3694. SelectItem *sie = items.getArray(ni-1)+1;
  3695. bool r = isrd;
  3696. bool w = iswr;
  3697. bool e = isex;
  3698. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3699. if (r&&dummysockopen&&findfds(rs,dummysock[0],r)) {
  3700. resettrigger();
  3701. --n;
  3702. }
  3703. #endif
  3704. for (i=0;(n>0)&&(i<ni);i++) {
  3705. if (r&&findfds(rs,si->handle,r)) {
  3706. if (!si->del) {
  3707. tonotify.append(*si);
  3708. tonotify.item(tonotify.length()-1).mode = SELECTMODE_READ;
  3709. }
  3710. --n;
  3711. }
  3712. if (w&&findfds(ws,si->handle,w)) {
  3713. if (!si->del) {
  3714. tonotify.append(*si);
  3715. tonotify.item(tonotify.length()-1).mode = SELECTMODE_WRITE;
  3716. }
  3717. --n;
  3718. }
  3719. if (e&&findfds(es,si->handle,e)) {
  3720. if (!si->del) {
  3721. tonotify.append(*si);
  3722. tonotify.item(tonotify.length()-1).mode = SELECTMODE_EXCEPT;
  3723. }
  3724. --n;
  3725. }
  3726. si++;
  3727. if (si==sie)
  3728. si = items.getArray();
  3729. }
  3730. #endif
  3731. }
  3732. ForEachItemIn(j,tonotify) {
  3733. SelectItem &si = tonotify.item(j);
  3734. try {
  3735. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  3736. }
  3737. catch (IException *e) { // should be acted upon by notifySelected
  3738. EXCLOG(e,"CSocketSelectThread notifySelected");
  3739. throw ;
  3740. }
  3741. }
  3742. }
  3743. else {
  3744. validateerrcount = 0;
  3745. if ((++numto>=lastnumto*2)) {
  3746. lastnumto = numto;
  3747. if (selecttrace&&(numto>4))
  3748. PROGLOG("%s: Select Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  3749. }
  3750. /*
  3751. if (numto&&(numto%100)) {
  3752. CriticalBlock block(sect);
  3753. if (!selectvarschange)
  3754. selectvarschange = checkSocks();
  3755. }
  3756. */
  3757. }
  3758. if (++offset>=ni)
  3759. offset = 0;
  3760. }
  3761. }
  3762. catch (IException *e) {
  3763. EXCLOG(e,"CSocketSelectThread");
  3764. termexcept.setown(e);
  3765. }
  3766. CriticalBlock block(sect);
  3767. try {
  3768. updateItems();
  3769. }
  3770. catch (IException *e) {
  3771. EXCLOG(e,"CSocketSelectThread(2)");
  3772. if (!termexcept)
  3773. termexcept.setown(e);
  3774. else
  3775. e->Release();
  3776. }
  3777. return 0;
  3778. }
  3779. };
  3780. class CSocketSelectHandler: public CInterface, implements ISocketSelectHandler
  3781. {
  3782. CIArrayOf<CSocketSelectThread> threads;
  3783. CriticalSection sect;
  3784. bool started;
  3785. StringAttr selecttrace;
  3786. public:
  3787. IMPLEMENT_IINTERFACE;
  3788. CSocketSelectHandler(const char *trc)
  3789. : selecttrace(trc)
  3790. {
  3791. started = false;
  3792. }
  3793. void start()
  3794. {
  3795. CriticalBlock block(sect);
  3796. if (!started) {
  3797. started = true;
  3798. ForEachItemIn(i,threads) {
  3799. threads.item(i).start();
  3800. }
  3801. }
  3802. }
  3803. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  3804. {
  3805. CriticalBlock block(sect);
  3806. loop {
  3807. bool added=false;
  3808. ForEachItemIn(i,threads) {
  3809. if (added)
  3810. threads.item(i).remove(sock);
  3811. else
  3812. added = threads.item(i).add(sock,mode,nfy);
  3813. }
  3814. if (added)
  3815. return;
  3816. CSocketSelectThread *thread = new CSocketSelectThread(selecttrace);
  3817. threads.append(*thread);
  3818. if (started)
  3819. thread->start();
  3820. }
  3821. }
  3822. void remove(ISocket *sock)
  3823. {
  3824. CriticalBlock block(sect);
  3825. ForEachItemIn(i,threads) {
  3826. if (threads.item(i).remove(sock)&&sock)
  3827. break;
  3828. }
  3829. }
  3830. void stop(bool wait)
  3831. {
  3832. IException *e=NULL;
  3833. CriticalBlock block(sect);
  3834. unsigned i = 0;
  3835. while (i<threads.ordinality()) {
  3836. CSocketSelectThread &t=threads.item(i);
  3837. {
  3838. CriticalUnblock unblock(sect);
  3839. t.stop(wait); // not quite as quick as could be if wait true
  3840. }
  3841. if (wait && !e && t.termexcept)
  3842. e = t.termexcept.getClear();
  3843. i++;
  3844. }
  3845. #if 0 // don't throw error as too late
  3846. if (e)
  3847. throw e;
  3848. #else
  3849. ::Release(e);
  3850. #endif
  3851. }
  3852. };
  3853. #ifdef _HAS_EPOLL_SUPPORT
  3854. class CSocketEpollThread: public CSocketBaseThread
  3855. {
  3856. int epfd;
  3857. int *epfdtbl; // table of fd<->item index for lookups
  3858. struct epoll_event *epevents;
  3859. void epoll_op(int efd, int op, int fd, unsigned int event_mask)
  3860. {
  3861. int srtn;
  3862. struct epoll_event event;
  3863. event.events = event_mask;
  3864. event.data.fd = fd;
  3865. # ifdef EPOLLTRACE
  3866. DBGLOG("EPOLL: op(%d) fd %d to epfd %d", op, fd, efd);
  3867. # endif
  3868. srtn = ::epoll_ctl(efd, op, fd, &event);
  3869. // if another thread closed fd before here don't fail
  3870. if ( (srtn < 0) && (op != EPOLL_CTL_DEL) ){
  3871. int err = ERRNO();
  3872. LOGERR(err,1,"epoll_ctl");
  3873. THROWJSOCKEXCEPTION2(err);
  3874. }
  3875. }
  3876. void opendummy()
  3877. {
  3878. CriticalBlock block(sect);
  3879. if (!dummysockopen) {
  3880. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3881. if(pipe(dummysock)) {
  3882. WARNLOG("CSocketEpollThread: create pipe failed %d",ERRNO());
  3883. return;
  3884. }
  3885. for (unsigned i=0;i<2;i++) {
  3886. int flags = fcntl(dummysock[i], F_GETFL, 0);
  3887. if (flags!=-1) {
  3888. flags |= O_NONBLOCK;
  3889. fcntl(dummysock[i], F_SETFL, flags);
  3890. }
  3891. flags = fcntl(dummysock[i], F_GETFD, 0);
  3892. if (flags!=-1) {
  3893. flags |= FD_CLOEXEC;
  3894. fcntl(dummysock[i], F_SETFD, flags);
  3895. }
  3896. }
  3897. CHECKSOCKRANGE(dummysock[0]);
  3898. epoll_op(epfd, EPOLL_CTL_ADD, dummysock[0], EPOLLIN);
  3899. #else
  3900. if (IP6preferred)
  3901. dummysock = ::socket(AF_INET6, SOCK_STREAM, PF_INET6);
  3902. else
  3903. dummysock = ::socket(AF_INET, SOCK_STREAM, 0);
  3904. CHECKSOCKRANGE(dummysock);
  3905. epoll_op(epfd, EPOLL_CTL_ADD, dummysock, (EPOLLIN | EPOLLERR));
  3906. #endif
  3907. dummysockopen = true;
  3908. }
  3909. }
  3910. void closedummy()
  3911. {
  3912. CriticalBlock block(sect);
  3913. if (dummysockopen) {
  3914. #ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  3915. epoll_op(epfd, EPOLL_CTL_DEL, dummysock[0], 0);
  3916. #ifdef SOCKTRACE
  3917. PROGLOG("SOCKTRACE: Closing dummy sockets %x %d %x %d (%x)", dummysock[0], dummysock[0], dummysock[1], dummysock[1], this);
  3918. #endif
  3919. ::close(dummysock[0]);
  3920. ::close(dummysock[1]);
  3921. #else
  3922. epoll_op(epfd, EPOLL_CTL_DEL, dummysock, 0);
  3923. ::close(dummysock);
  3924. #endif
  3925. dummysockopen = false;
  3926. }
  3927. }
  3928. public:
  3929. IMPLEMENT_IINTERFACE;
  3930. CSocketEpollThread(const char *trc)
  3931. : CSocketBaseThread("CSocketEpollThread")
  3932. {
  3933. dummysockopen = false;
  3934. terminating = false;
  3935. atomic_set(&tickwait,0);
  3936. waitingchange = 0;
  3937. selectvarschange = false;
  3938. validateselecterror = 0;
  3939. validateerrcount = 0;
  3940. offset = 0;
  3941. selecttrace = trc;
  3942. epfd = ::epoll_create(XFD_SETSIZE);
  3943. if (epfd < 0) {
  3944. int err = ERRNO();
  3945. LOGERR(err,1,"epoll_create()");
  3946. THROWJSOCKEXCEPTION2(err);
  3947. }
  3948. # if defined(_DEBUG) || defined(EPOLLTRACE)
  3949. DBGLOG("CSocketEpollThread: creating epoll fd %d", epfd );
  3950. # endif
  3951. try {
  3952. epfdtbl = new int[XFD_SETSIZE];
  3953. } catch (const std::bad_alloc &e) {
  3954. int err = ERRNO();
  3955. LOGERR(err,1,"epfdtbl alloc()");
  3956. THROWJSOCKEXCEPTION2(err);
  3957. }
  3958. for (int i=0; i<XFD_SETSIZE; i++) {
  3959. epfdtbl[i] = -1;
  3960. }
  3961. try {
  3962. epevents = new struct epoll_event[XFD_SETSIZE];
  3963. } catch (const std::bad_alloc &e) {
  3964. int err = ERRNO();
  3965. LOGERR(err,1,"epevents alloc()");
  3966. THROWJSOCKEXCEPTION2(err);
  3967. }
  3968. opendummy();
  3969. }
  3970. ~CSocketEpollThread()
  3971. {
  3972. closedummy();
  3973. ForEachItemIn(i,items) {
  3974. try {
  3975. SelectItem &si = items.item(i);
  3976. epoll_op(epfd, EPOLL_CTL_DEL, si.handle, 0);
  3977. si.sock->Release();
  3978. si.nfy->Release();
  3979. }
  3980. catch (IException *e) {
  3981. EXCLOG(e,"~CSocketEpollThread");
  3982. e->Release();
  3983. }
  3984. }
  3985. if (epfd >= 0) {
  3986. # ifdef EPOLLTRACE
  3987. DBGLOG("EPOLL: closing epfd %d", epfd );
  3988. # endif
  3989. ::close(epfd);
  3990. epfd = -1;
  3991. delete [] epfdtbl;
  3992. delete [] epevents;
  3993. }
  3994. }
  3995. Owned<IException> termexcept;
  3996. void updateItems()
  3997. {
  3998. // must be in CriticalBlock block(sect);
  3999. unsigned n = items.ordinality();
  4000. bool reindex = false;
  4001. for (unsigned i=0;i<n;) {
  4002. SelectItem &si = items.item(i);
  4003. if (si.add_epoll) {
  4004. reindex = true;
  4005. }
  4006. if (si.del) {
  4007. epoll_op(epfd, EPOLL_CTL_DEL, si.handle, 0);
  4008. epfdtbl[si.handle] = -1;
  4009. reindex = true;
  4010. si.nfy->Release();
  4011. try {
  4012. #ifdef SOCKTRACE
  4013. PROGLOG("CSocketEpollThread::updateItems release %d",si.handle);
  4014. #endif
  4015. si.sock->Release();
  4016. }
  4017. catch (IException *e) {
  4018. EXCLOG(e,"CSocketEpollThread::updateItems");
  4019. e->Release();
  4020. }
  4021. n--;
  4022. if (i<n)
  4023. si = items.item(n);
  4024. items.remove(n);
  4025. }
  4026. else
  4027. i++;
  4028. }
  4029. assertex(n<=XFD_SETSIZE-1);
  4030. if (reindex) {
  4031. # ifdef EPOLLTRACE
  4032. int max_sockid = 0;
  4033. # endif
  4034. ForEachItemIn(j,items) {
  4035. SelectItem &si = items.item(j);
  4036. epfdtbl[si.handle] = j;
  4037. if (si.add_epoll) {
  4038. si.add_epoll = false;
  4039. int srtn, ep_mode;
  4040. struct epoll_event event;
  4041. if (si.mode != 0) {
  4042. ep_mode = 0;
  4043. if (si.mode & SELECTMODE_READ) {
  4044. ep_mode |= (EPOLLIN | EPOLLPRI);
  4045. }
  4046. if (si.mode & SELECTMODE_WRITE) {
  4047. ep_mode |= EPOLLOUT;
  4048. }
  4049. if (si.mode & SELECTMODE_EXCEPT) {
  4050. ep_mode |= EPOLLERR;
  4051. }
  4052. if (ep_mode != 0) {
  4053. ep_mode |= EPOLLRDHUP;
  4054. epoll_op(epfd, EPOLL_CTL_ADD, si.handle, ep_mode);
  4055. }
  4056. }
  4057. # ifdef EPOLLTRACE
  4058. max_sockid=std::max(si.handle, max_sockid);
  4059. # endif
  4060. }
  4061. # ifdef EPOLLTRACE
  4062. for(int ix=0; ix<=max_sockid; ix++) {
  4063. DBGLOG("EPOLL: epfdtbl[%d] = %d", ix, epfdtbl[ix]);
  4064. }
  4065. # endif
  4066. }
  4067. # ifdef EPOLLTRACE
  4068. DBGLOG("EPOLL: leaving updateItems(), reindex = %d", reindex);
  4069. # endif
  4070. }
  4071. }
  4072. bool add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4073. {
  4074. // maybe check once to prevent 1st delay? TBD
  4075. CriticalBlock block(sect);
  4076. unsigned n=0;
  4077. ForEachItemIn(i,items) {
  4078. SelectItem &si = items.item(i);
  4079. if (!si.del) {
  4080. if (si.sock==sock) {
  4081. si.del = true;
  4082. }
  4083. else
  4084. n++;
  4085. }
  4086. }
  4087. if (n>=XFD_SETSIZE-1) // leave 1 spare
  4088. return false;
  4089. SelectItem sn;
  4090. sn.nfy = LINK(nfy);
  4091. sn.sock = LINK(sock);
  4092. sn.mode = (byte)mode;
  4093. sn.handle = (T_SOCKET)sock->OShandle();
  4094. CHECKSOCKRANGE(sn.handle);
  4095. sn.del = false;
  4096. sn.add_epoll = true;
  4097. items.append(sn);
  4098. selectvarschange = true;
  4099. triggerselect();
  4100. return true;
  4101. }
  4102. void updateEpollVars(unsigned &ni)
  4103. {
  4104. CriticalBlock block(sect);
  4105. selectvarschange = false;
  4106. if (waitingchange) {
  4107. waitingchangesem.signal(waitingchange);
  4108. waitingchange = 0;
  4109. }
  4110. if (validateselecterror) { // something went wrong so check sockets
  4111. validateerrcount++;
  4112. if (!checkSocks()) {
  4113. // bad socket not found
  4114. PROGLOG("CSocketEpollThread::updateEpollVars cannot find socket error");
  4115. if (validateerrcount>10)
  4116. throw MakeStringException(-1,"CSocketEpollThread:Socket epoll error %d",validateselecterror);
  4117. }
  4118. }
  4119. else
  4120. validateerrcount = 0;
  4121. updateItems();
  4122. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4123. opendummy();
  4124. #endif
  4125. ni = items.ordinality();
  4126. validateselecterror = 0;
  4127. }
  4128. int run()
  4129. {
  4130. try {
  4131. unsigned ni = 0;
  4132. unsigned numto = 0;
  4133. unsigned lastnumto = 0;
  4134. unsigned totnum = 0;
  4135. unsigned total = 0;
  4136. selectvarschange = true;
  4137. while (!terminating) {
  4138. if (selectvarschange) {
  4139. updateEpollVars(ni);
  4140. }
  4141. if (ni==0) {
  4142. validateerrcount = 0;
  4143. atomic_inc(&tickwait);
  4144. if(!selectvarschange&&!terminating)
  4145. ticksem.wait(SELECT_TIMEOUT_SECS*1000);
  4146. atomic_dec(&tickwait);
  4147. continue;
  4148. }
  4149. int n = ::epoll_wait(epfd, epevents, XFD_SETSIZE, 1000);
  4150. # ifdef EPOLLTRACE
  4151. if(n > 0)
  4152. DBGLOG("EPOLL: after epoll_wait(), n = %d, ni = %d", n, ni);
  4153. # endif
  4154. if (terminating)
  4155. break;
  4156. if (n < 0) {
  4157. CriticalBlock block(sect);
  4158. int err = ERRNO();
  4159. if (err != EINTRCALL) {
  4160. if (dummysockopen) {
  4161. LOGERR(err,12,"CSocketEpollThread epoll error"); // should cache error ?
  4162. validateselecterror = err;
  4163. #ifndef _USE_PIPE_FOR_SELECT_TRIGGER
  4164. closedummy(); // just in case was culprit
  4165. #endif
  4166. }
  4167. selectvarschange = true;
  4168. continue;
  4169. }
  4170. n = 0;
  4171. }
  4172. else if (n>0) {
  4173. validateerrcount = 0;
  4174. numto = 0;
  4175. lastnumto = 0;
  4176. total += n;
  4177. totnum++;
  4178. SelectItemArray tonotify;
  4179. {
  4180. CriticalBlock block(sect);
  4181. for (int j=0;j<n;j++) {
  4182. # ifdef EPOLLTRACE
  4183. DBGLOG("EPOLL: epevents[%d].data.fd = %d, epfdtbl = %d, emask = %d", j, epevents[j].data.fd, epfdtbl[epevents[j].data.fd], epevents[j].events);
  4184. # endif
  4185. # ifdef _USE_PIPE_FOR_SELECT_TRIGGER
  4186. if ((dummysockopen) && (epevents[j].data.fd == dummysock[0])) {
  4187. resettrigger();
  4188. continue;
  4189. }
  4190. # endif
  4191. if (epevents[j].data.fd >= 0) {
  4192. assertex(epfdtbl[epevents[j].data.fd] >= 0);
  4193. SelectItem *epsi = items.getArray(epfdtbl[epevents[j].data.fd]);
  4194. if (!epsi->del) {
  4195. unsigned int ep_mode = 0;
  4196. if (epevents[j].events & (EPOLLIN | EPOLLPRI)) {
  4197. ep_mode |= SELECTMODE_READ;
  4198. }
  4199. if (epevents[j].events & (EPOLLERR | EPOLLHUP)) {
  4200. ep_mode |= SELECTMODE_READ;
  4201. }
  4202. if (epevents[j].events & EPOLLRDHUP) {
  4203. // TODO - or should we set EXCEPT ?
  4204. ep_mode |= SELECTMODE_READ;
  4205. }
  4206. if (epevents[j].events & EPOLLOUT) {
  4207. ep_mode |= SELECTMODE_WRITE;
  4208. }
  4209. if (ep_mode != 0) {
  4210. tonotify.append(*epsi);
  4211. tonotify.item(tonotify.length()-1).mode = ep_mode;
  4212. }
  4213. }
  4214. }
  4215. }
  4216. }
  4217. ForEachItemIn(j,tonotify) {
  4218. SelectItem &si = tonotify.item(j);
  4219. try {
  4220. si.nfy->notifySelected(si.sock,si.mode); // ignore return
  4221. }
  4222. catch (IException *e) { // should be acted upon by notifySelected
  4223. EXCLOG(e,"CSocketEpollThread notifySelected");
  4224. throw ;
  4225. }
  4226. }
  4227. }
  4228. else {
  4229. validateerrcount = 0;
  4230. if ((++numto>=lastnumto*2)) {
  4231. lastnumto = numto;
  4232. if (selecttrace&&(numto>4))
  4233. PROGLOG("%s: Epoll Idle(%d), %d,%d,%0.2f",selecttrace,numto,totnum,total,totnum?((double)total/(double)totnum):0.0);
  4234. }
  4235. /*
  4236. if (numto&&(numto%100)) {
  4237. CriticalBlock block(sect);
  4238. if (!selectvarschange)
  4239. selectvarschange = checkSocks();
  4240. }
  4241. */
  4242. }
  4243. }
  4244. }
  4245. catch (IException *e) {
  4246. EXCLOG(e,"CSocketEpollThread");
  4247. termexcept.setown(e);
  4248. }
  4249. CriticalBlock block(sect);
  4250. try {
  4251. updateItems();
  4252. }
  4253. catch (IException *e) {
  4254. EXCLOG(e,"CSocketEpollThread(2)");
  4255. if (!termexcept)
  4256. termexcept.setown(e);
  4257. else
  4258. e->Release();
  4259. }
  4260. return 0;
  4261. }
  4262. };
  4263. class CSocketEpollHandler: public CInterface, implements ISocketSelectHandler
  4264. {
  4265. CSocketEpollThread *epollthread;
  4266. CriticalSection sect;
  4267. bool started;
  4268. StringAttr epolltrace;
  4269. public:
  4270. IMPLEMENT_IINTERFACE;
  4271. CSocketEpollHandler(const char *trc)
  4272. : epolltrace(trc)
  4273. {
  4274. epollthread = new CSocketEpollThread(epolltrace);
  4275. }
  4276. ~CSocketEpollHandler()
  4277. {
  4278. delete epollthread;
  4279. }
  4280. void start()
  4281. {
  4282. CriticalBlock block(sect);
  4283. epollthread->start();
  4284. }
  4285. void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
  4286. {
  4287. CriticalBlock block(sect);
  4288. epollthread->add(sock,mode,nfy);
  4289. }
  4290. void remove(ISocket *sock)
  4291. {
  4292. CriticalBlock block(sect);
  4293. epollthread->remove(sock);
  4294. }
  4295. void stop(bool wait)
  4296. {
  4297. IException *e=NULL;
  4298. epollthread->stop(wait); // not quite as quick as could be if wait true
  4299. if (wait && !e && epollthread->termexcept)
  4300. e = epollthread->termexcept.getClear();
  4301. #if 0 // don't throw error as too late
  4302. if (e)
  4303. throw e;
  4304. #else
  4305. ::Release(e);
  4306. #endif
  4307. }
  4308. };
  4309. #endif // _HAS_EPOLL_SUPPORT
  4310. enum EpollMethod { EPOLL_INIT = 0, EPOLL_DISABLED, EPOLL_ENABLED };
  4311. static EpollMethod epoll_method = EPOLL_INIT;
  4312. static CriticalSection epollsect;
  4313. ISocketSelectHandler *createSocketSelectHandler(const char *trc)
  4314. {
  4315. #ifdef _HAS_EPOLL_SUPPORT
  4316. {
  4317. CriticalBlock block(epollsect);
  4318. // DBGLOG("createSocketSelectHandler(): epoll_method = %d",epoll_method);
  4319. if (epoll_method == EPOLL_INIT) {
  4320. Owned<IProperties> conf = createProperties(CONFIG_DIR PATHSEPSTR "environment.conf", true);
  4321. if (conf->getPropBool("use_epoll", true)) {
  4322. epoll_method = EPOLL_ENABLED;
  4323. } else {
  4324. epoll_method = EPOLL_DISABLED;
  4325. }
  4326. // DBGLOG("createSocketSelectHandler(): after reading conf file, epoll_method = %d",epoll_method);
  4327. }
  4328. }
  4329. if (epoll_method == EPOLL_ENABLED)
  4330. return new CSocketEpollHandler(trc);
  4331. else
  4332. return new CSocketSelectHandler(trc);
  4333. #else
  4334. return new CSocketSelectHandler(trc);
  4335. #endif
  4336. }
  4337. ISocketSelectHandler *createSocketEpollHandler(const char *trc)
  4338. {
  4339. #ifdef _HAS_EPOLL_SUPPORT
  4340. return new CSocketEpollHandler(trc);
  4341. #else
  4342. return new CSocketSelectHandler(trc);
  4343. #endif
  4344. }
  4345. void readBuffer(ISocket * socket, MemoryBuffer & buffer)
  4346. {
  4347. size32_t len;
  4348. socket->read(&len, sizeof(len));
  4349. _WINREV4(len);
  4350. if (len) {
  4351. void * target = buffer.reserve(len);
  4352. socket->read(target, len);
  4353. }
  4354. }
  4355. void readBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  4356. {
  4357. size32_t len;
  4358. size32_t sizeGot;
  4359. socket->readtms(&len, sizeof(len), sizeof(len), sizeGot, timeoutms);
  4360. _WINREV4(len);
  4361. if (len) {
  4362. void * target = buffer.reserve(len);
  4363. socket->readtms(target, len, len, sizeGot, timeoutms);
  4364. }
  4365. }
  4366. void writeBuffer(ISocket * socket, MemoryBuffer & buffer)
  4367. {
  4368. unsigned len = buffer.length();
  4369. _WINREV4(len);
  4370. socket->write(&len, sizeof(len));
  4371. if (len)
  4372. socket->write(buffer.toByteArray(), buffer.length());
  4373. }
  4374. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer)
  4375. {
  4376. try
  4377. {
  4378. readBuffer(socket, buffer);
  4379. return true;
  4380. }
  4381. catch (IException * e)
  4382. {
  4383. switch (e->errorCode())
  4384. {
  4385. case JSOCKERR_graceful_close:
  4386. break;
  4387. default:
  4388. EXCLOG(e,"catchReadBuffer");
  4389. break;
  4390. }
  4391. e->Release();
  4392. }
  4393. return false;
  4394. }
  4395. bool catchReadBuffer(ISocket * socket, MemoryBuffer & buffer, unsigned timeoutms)
  4396. {
  4397. try
  4398. {
  4399. readBuffer(socket, buffer, timeoutms);
  4400. return true;
  4401. }
  4402. catch (IException * e)
  4403. {
  4404. switch (e->errorCode())
  4405. {
  4406. case JSOCKERR_graceful_close:
  4407. break;
  4408. default:
  4409. EXCLOG(e,"catchReadBuffer");
  4410. break;
  4411. }
  4412. e->Release();
  4413. }
  4414. return false;
  4415. }
  4416. bool catchWriteBuffer(ISocket * socket, MemoryBuffer & buffer)
  4417. {
  4418. try
  4419. {
  4420. writeBuffer(socket, buffer);
  4421. return true;
  4422. }
  4423. catch (IException * e)
  4424. {
  4425. EXCLOG(e,"catchWriteBuffer");
  4426. e->Release();
  4427. }
  4428. return false;
  4429. }
  4430. // utility interface for simple conversations
  4431. // conversation is always between two ends,
  4432. // at any given time one end must be receiving and other sending (though these may swap during the conversation)
  4433. class CSingletonSocketConnection: public CInterface, implements IConversation
  4434. {
  4435. Owned<ISocket> sock;
  4436. Owned<ISocket> listensock;
  4437. enum { Snone, Saccept, Sconnect, Srecv, Ssend, Scancelled } state;
  4438. bool accepting;
  4439. bool cancelling;
  4440. SocketEndpoint ep;
  4441. CriticalSection crit;
  4442. public:
  4443. IMPLEMENT_IINTERFACE;
  4444. CSingletonSocketConnection(SocketEndpoint &_ep)
  4445. {
  4446. ep = _ep;
  4447. state = Snone;
  4448. cancelling = false;
  4449. }
  4450. ~CSingletonSocketConnection()
  4451. {
  4452. try {
  4453. if (sock)
  4454. sock->close();
  4455. }
  4456. catch (IException *e) {
  4457. if (e->errorCode()!=JSOCKERR_graceful_close)
  4458. EXCLOG(e,"CSingletonSocketConnection close");
  4459. e->Release();
  4460. }
  4461. }
  4462. bool connect(unsigned timeoutms)
  4463. {
  4464. CriticalBlock block(crit);
  4465. if (cancelling)
  4466. state = Scancelled;
  4467. if (state==Scancelled)
  4468. return false;
  4469. assertex(!sock);
  4470. ISocket *newsock=NULL;
  4471. state = Sconnect;
  4472. unsigned start = 0;
  4473. if (timeoutms!=(unsigned)INFINITE)
  4474. start = msTick();
  4475. while (state==Sconnect) {
  4476. try {
  4477. CriticalUnblock unblock(crit);
  4478. newsock = ISocket::connect_wait(ep,1000*60*4);
  4479. break;
  4480. }
  4481. catch (IException * e) {
  4482. if ((e->errorCode()==JSOCKERR_timeout_expired)||(e->errorCode()==JSOCKERR_connection_failed)) {
  4483. e->Release();
  4484. if ((state==Sconnect)&&(timeoutms!=(unsigned)INFINITE)&&(msTick()-start>timeoutms)) {
  4485. state = Snone;
  4486. return false;
  4487. }
  4488. }
  4489. else {
  4490. state = Scancelled;
  4491. EXCLOG(e,"CSingletonSocketConnection::connect");
  4492. e->Release();
  4493. return false;
  4494. }
  4495. }
  4496. }
  4497. if (state!=Sconnect) {
  4498. ::Release(newsock);
  4499. newsock = NULL;
  4500. }
  4501. if (!newsock) {
  4502. state = Scancelled;
  4503. return false;
  4504. }
  4505. sock.setown(newsock);
  4506. return true;
  4507. }
  4508. bool send(MemoryBuffer &mb)
  4509. {
  4510. CriticalBlock block(crit);
  4511. if (cancelling)
  4512. state = Scancelled;
  4513. if (state==Scancelled)
  4514. return false;
  4515. assertex(sock);
  4516. state = Srecv;
  4517. try {
  4518. CriticalUnblock unblock(crit);
  4519. writeBuffer(sock,mb);
  4520. }
  4521. catch (IException * e) {
  4522. state = Scancelled;
  4523. EXCLOG(e,"CSingletonSocketConnection::send");
  4524. e->Release();
  4525. return false;
  4526. }
  4527. state = Snone;
  4528. return true;
  4529. }
  4530. unsigned short setRandomPort(unsigned short base, unsigned num)
  4531. {
  4532. loop {
  4533. try {
  4534. ep.port = base+(unsigned short)(getRandom()%num);
  4535. listensock.setown(ISocket::create(ep.port));
  4536. return ep.port;
  4537. }
  4538. catch (IException *e) {
  4539. if (e->errorCode()!=JSOCKERR_port_in_use) {
  4540. state = Scancelled;
  4541. EXCLOG(e,"CSingletonSocketConnection::setRandomPort");
  4542. e->Release();
  4543. break;
  4544. }
  4545. e->Release();
  4546. }
  4547. }
  4548. return 0;
  4549. }
  4550. bool accept(unsigned timeoutms)
  4551. {
  4552. CriticalBlock block(crit);
  4553. if (cancelling)
  4554. state = Scancelled;
  4555. if (state==Scancelled)
  4556. return false;
  4557. if (!sock) {
  4558. ISocket *newsock=NULL;
  4559. state = Saccept;
  4560. loop {
  4561. try {
  4562. {
  4563. CriticalUnblock unblock(crit);
  4564. if (!listensock)
  4565. listensock.setown(ISocket::create(ep.port));
  4566. if ((timeoutms!=(unsigned)INFINITE)&&(!listensock->wait_read(timeoutms))) {
  4567. state = Snone;
  4568. return false;
  4569. }
  4570. }
  4571. if (cancelling)
  4572. state = Scancelled;
  4573. if (state==Scancelled)
  4574. return false;
  4575. {
  4576. CriticalUnblock unblock(crit);
  4577. newsock=listensock->accept(true);
  4578. break;
  4579. }
  4580. }
  4581. catch (IException *e) {
  4582. if (e->errorCode()==JSOCKERR_graceful_close)
  4583. PROGLOG("CSingletonSocketConnection: Closed socket on accept - retrying...");
  4584. else {
  4585. state = Scancelled;
  4586. EXCLOG(e,"CSingletonSocketConnection::accept");
  4587. e->Release();
  4588. break;
  4589. }
  4590. e->Release();
  4591. }
  4592. }
  4593. if (state!=Saccept) {
  4594. ::Release(newsock);
  4595. newsock = NULL;
  4596. }
  4597. if (!newsock) {
  4598. state = Scancelled;
  4599. return false;
  4600. }
  4601. sock.setown(newsock);
  4602. }
  4603. return true;
  4604. }
  4605. bool recv(MemoryBuffer &mb, unsigned timeoutms)
  4606. {
  4607. CriticalBlock block(crit);
  4608. if (cancelling)
  4609. state = Scancelled;
  4610. if (state==Scancelled)
  4611. return false;
  4612. assertex(sock);
  4613. state = Srecv;
  4614. try {
  4615. CriticalUnblock unblock(crit);
  4616. readBuffer(sock,mb,timeoutms);
  4617. }
  4618. catch (IException *e) {
  4619. if (e->errorCode()==JSOCKERR_timeout_expired)
  4620. state = Snone;
  4621. else {
  4622. state = Scancelled;
  4623. if (e->errorCode()!=JSOCKERR_graceful_close)
  4624. EXCLOG(e,"CSingletonSocketConnection::recv");
  4625. }
  4626. e->Release();
  4627. return false;
  4628. }
  4629. state = Snone;
  4630. return true;
  4631. }
  4632. virtual void cancel()
  4633. {
  4634. CriticalBlock block(crit);
  4635. while (state!=Scancelled) {
  4636. cancelling = true;
  4637. try {
  4638. switch (state) {
  4639. case Saccept:
  4640. {
  4641. if (listensock)
  4642. listensock->cancel_accept();
  4643. }
  4644. break;
  4645. case Sconnect:
  4646. // wait for timeout
  4647. break;
  4648. case Srecv:
  4649. {
  4650. if (sock)
  4651. sock->close();
  4652. }
  4653. break;
  4654. case Ssend:
  4655. // wait for finished
  4656. break;
  4657. default:
  4658. state = Scancelled;
  4659. break;
  4660. }
  4661. }
  4662. catch (IException *e) {
  4663. EXCLOG(e,"CSingletonSocketConnection::cancel");
  4664. e->Release();
  4665. }
  4666. {
  4667. CriticalUnblock unblock(crit);
  4668. Sleep(1000);
  4669. }
  4670. }
  4671. }
  4672. };
  4673. IConversation *createSingletonSocketConnection(unsigned short port,SocketEndpoint *_ep)
  4674. {
  4675. SocketEndpoint ep;
  4676. if (_ep)
  4677. ep = *_ep;
  4678. if (port)
  4679. ep.port = port;
  4680. return new CSingletonSocketConnection(ep);
  4681. }
  4682. // interface for reading from multiple sockets using the BF_SYNC_TRANSFER_PUSH protocol
  4683. class CSocketBufferReader: public CInterface, implements ISocketBufferReader
  4684. {
  4685. class SocketElem: public CInterface, implements ISocketSelectNotify
  4686. {
  4687. CSocketBufferReader *parent;
  4688. unsigned num; // top bit used for ready
  4689. MemoryAttr blk;
  4690. CriticalSection sect;
  4691. Linked<ISocket> sock;
  4692. bool active;
  4693. bool pending;
  4694. public:
  4695. IMPLEMENT_IINTERFACE;
  4696. void init(CSocketBufferReader *_parent,ISocket *_sock,unsigned _n)
  4697. {
  4698. parent = _parent;
  4699. num = _n;
  4700. sock.set(_sock);
  4701. active = true;
  4702. pending = false;
  4703. }
  4704. virtual bool notifySelected(ISocket *socket,unsigned selected)
  4705. {
  4706. assertex(sock==socket);
  4707. {
  4708. CriticalBlock block(sect);
  4709. if (pending) {
  4710. active = false;
  4711. parent->remove(sock);
  4712. return false;
  4713. }
  4714. pending = true;
  4715. unsigned t1=usTick();
  4716. size32_t sz = sock->receive_block_size();
  4717. unsigned t2=usTick();
  4718. if (sz)
  4719. sock->receive_block(blk.allocate(sz),sz);
  4720. else
  4721. parent->remove(sock);
  4722. unsigned t3=usTick();
  4723. if (t3-t1>60*1000000)
  4724. PROGLOG("CSocketBufferReader(%d): slow receive_block (%d,%d) sz=%d",num,t2-t1,t3-t2,sz);
  4725. }
  4726. parent->enqueue(this); // nb outside sect critical block
  4727. return false; // always return false
  4728. }
  4729. unsigned get(MemoryBuffer &mb)
  4730. {
  4731. CriticalBlock block(sect);
  4732. assertex(pending);
  4733. size32_t sz = blk.length();
  4734. if (sz)
  4735. mb.setBuffer(sz,blk.detach(),true);
  4736. pending = false;
  4737. if (!active) {
  4738. active = true;
  4739. parent->add(*this);
  4740. }
  4741. return num;
  4742. }
  4743. size32_t size()
  4744. {
  4745. return blk.length();
  4746. }
  4747. ISocket *getSocket() { return sock; }
  4748. } *elems;
  4749. SimpleInterThreadQueueOf<SocketElem, false> readyq;
  4750. Owned<ISocketSelectHandler> selecthandler;
  4751. size32_t buffersize;
  4752. size32_t buffermax;
  4753. unsigned bufferwaiting;
  4754. CriticalSection buffersect;
  4755. Semaphore buffersem;
  4756. bool isdone;
  4757. public:
  4758. IMPLEMENT_IINTERFACE;
  4759. CSocketBufferReader(const char *trc)
  4760. {
  4761. selecthandler.setown(createSocketSelectHandler(trc));
  4762. elems = NULL;
  4763. }
  4764. ~CSocketBufferReader()
  4765. {
  4766. delete [] elems;
  4767. }
  4768. virtual void init(unsigned num,ISocket **sockets,size32_t _buffermax)
  4769. {
  4770. elems = new SocketElem[num];
  4771. for (unsigned i=0;i<num;i++) {
  4772. ISocket *sock = sockets[i];
  4773. if (sock) { // can have gaps
  4774. elems[i].init(this,sock,i);
  4775. add(elems[i]);
  4776. }
  4777. }
  4778. buffersize = 0;
  4779. buffermax = _buffermax;
  4780. bufferwaiting = 0;
  4781. isdone = false;
  4782. selecthandler->start();
  4783. }
  4784. virtual unsigned get(MemoryBuffer &mb)
  4785. {
  4786. SocketElem &e = *readyq.dequeue();
  4787. CriticalBlock block(buffersect);
  4788. assertex(buffersize>=e.size());
  4789. buffersize-=e.size();
  4790. if (bufferwaiting) {
  4791. buffersem.signal(bufferwaiting);
  4792. bufferwaiting = 0;
  4793. }
  4794. return e.get(mb);
  4795. }
  4796. virtual void done(bool wait)
  4797. {
  4798. buffersem.signal(0x10000);
  4799. isdone = true;
  4800. selecthandler->stop(wait);
  4801. if (wait) {
  4802. delete [] elems;
  4803. elems = NULL;
  4804. }
  4805. }
  4806. void enqueue(SocketElem *elem)
  4807. {
  4808. if (elem) {
  4809. CriticalBlock block(buffersect);
  4810. size32_t sz = elem->size();
  4811. while ((buffersize>0)&&(sz>0)&&(buffersize+sz>buffermax)) {
  4812. if (isdone)
  4813. return;
  4814. bufferwaiting++;
  4815. CriticalUnblock unblock(buffersect);
  4816. buffersem.wait();
  4817. }
  4818. buffersize += sz;
  4819. }
  4820. readyq.enqueue(elem);
  4821. }
  4822. void remove(ISocket *sock)
  4823. {
  4824. selecthandler->remove(sock);
  4825. }
  4826. void add(SocketElem &elem)
  4827. {
  4828. selecthandler->add(elem.getSocket(),SELECTMODE_READ,&elem);
  4829. }
  4830. };
  4831. ISocketBufferReader *createSocketBufferReader(const char *trc)
  4832. {
  4833. return new CSocketBufferReader(trc);
  4834. }
  4835. extern jlib_decl void markNodeCentral(SocketEndpoint &ep)
  4836. {
  4837. #ifdef CENTRAL_NODE_RANDOM_DELAY
  4838. CriticalBlock block(CSocket::crit);
  4839. CentralNodeArray.append(ep);
  4840. #endif
  4841. }
  4842. static CSocket *prepareSocket(unsigned idx,const SocketEndpoint &ep, ISocketConnectNotify &inotify)
  4843. {
  4844. Owned<CSocket> sock = new CSocket(ep,sm_tcp,NULL);
  4845. int err = sock->pre_connect(false);
  4846. if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
  4847. return sock.getClear();
  4848. if (err==0) {
  4849. int err = sock->post_connect();
  4850. if (err==0)
  4851. inotify.connected(idx,ep,sock);
  4852. else {
  4853. sock->errclose();
  4854. inotify.failed(idx,ep,err);
  4855. }
  4856. }
  4857. else
  4858. inotify.failed(idx,ep,err);
  4859. return NULL;
  4860. }
  4861. void multiConnect(const SocketEndpointArray &eps,ISocketConnectNotify &inotify,unsigned timeout)
  4862. {
  4863. class SocketElem: public CInterface, implements ISocketSelectNotify
  4864. {
  4865. CriticalSection *sect;
  4866. ISocketSelectHandler *handler;
  4867. unsigned *remaining;
  4868. Semaphore *notifysem;
  4869. ISocketConnectNotify *inotify;
  4870. public:
  4871. Owned<CSocket> sock;
  4872. SocketEndpoint ep;
  4873. unsigned idx;
  4874. IMPLEMENT_IINTERFACE;
  4875. void init(CSocket *_sock,unsigned _idx,SocketEndpoint &_ep,CriticalSection *_sect,ISocketSelectHandler *_handler,ISocketConnectNotify *_inotify, unsigned *_remaining, Semaphore *_notifysem)
  4876. {
  4877. ep = _ep;
  4878. idx = _idx;
  4879. inotify = _inotify;
  4880. sock.setown(_sock),
  4881. sect = _sect;
  4882. handler = _handler;
  4883. remaining = _remaining;
  4884. notifysem = _notifysem;
  4885. }
  4886. virtual bool notifySelected(ISocket *socket,unsigned selected)
  4887. {
  4888. CriticalBlock block(*sect);
  4889. handler->remove(socket);
  4890. int err = sock->post_connect();
  4891. CSocket *newsock = NULL;
  4892. {
  4893. CriticalUnblock unblock(*sect); // up to caller to cope with multithread
  4894. if (err==0)
  4895. inotify->connected(idx,ep,sock);
  4896. else if ((err==ETIMEDOUT)||(err==ECONNREFUSED)) {
  4897. // don't give up so easily (maybe listener not yet started (i.e. racing))
  4898. newsock = prepareSocket(idx,ep,*inotify);
  4899. Sleep(100); // not very nice but without this would just loop
  4900. }
  4901. else
  4902. inotify->failed(idx,ep,err);
  4903. }
  4904. if (newsock) {
  4905. sock.setown(newsock);
  4906. handler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,this);
  4907. }
  4908. else {
  4909. sock.clear();
  4910. (*remaining)--;
  4911. notifysem->signal();
  4912. }
  4913. return false;
  4914. }
  4915. } *elems;
  4916. unsigned n = eps.ordinality();
  4917. unsigned remaining = n;
  4918. if (!n)
  4919. return;
  4920. elems = new SocketElem[n];
  4921. unsigned i;
  4922. CriticalSection sect;
  4923. Semaphore notifysem;
  4924. Owned<ISocketSelectHandler> selecthandler = createSocketSelectHandler(
  4925. #ifdef _DEBUG
  4926. "multiConnect"
  4927. #else
  4928. NULL
  4929. #endif
  4930. );
  4931. StringBuffer name;
  4932. for (i=0;i<n;i++) {
  4933. CSocket* sock = prepareSocket(i,eps.item(i),inotify);
  4934. if (sock) {
  4935. elems[i].init(sock,i,eps.item(i),&sect,selecthandler,&inotify,&remaining,&notifysem);
  4936. selecthandler->add(sock,SELECTMODE_WRITE|SELECTMODE_EXCEPT,&elems[i]);
  4937. }
  4938. else
  4939. remaining--;
  4940. }
  4941. if (remaining) {
  4942. unsigned lastremaining=remaining;
  4943. selecthandler->start();
  4944. loop {
  4945. bool to=!notifysem.wait(timeout);
  4946. {
  4947. CriticalBlock block(sect);
  4948. if (remaining==0)
  4949. break;
  4950. if (to&&(remaining==lastremaining))
  4951. break; // nothing happened recently
  4952. lastremaining = remaining;
  4953. }
  4954. }
  4955. selecthandler->stop(true);
  4956. }
  4957. selecthandler.clear();
  4958. if (remaining) {
  4959. for (unsigned j=0;j<n;j++) { // mop up timeouts
  4960. SocketElem &elem = elems[j];
  4961. if (elem.sock.get()) {
  4962. elem.sock.clear();
  4963. inotify.failed(j,elem.ep,-1);
  4964. remaining--;
  4965. if (remaining==0)
  4966. break;
  4967. }
  4968. }
  4969. delete [] elems;
  4970. }
  4971. }
  4972. void multiConnect(const SocketEndpointArray &eps, PointerIArrayOf<ISocket> &retsockets,unsigned timeout)
  4973. {
  4974. unsigned n = eps.ordinality();
  4975. if (n==0)
  4976. return;
  4977. if (n==1) { // no need for multi
  4978. ISocket *sock = NULL;
  4979. try {
  4980. sock = ISocket::connect_timeout(eps.item(0),timeout);
  4981. }
  4982. catch (IException *e) { // ignore error just append NULL
  4983. sock = NULL;
  4984. e->Release();
  4985. }
  4986. retsockets.append(sock);
  4987. return;
  4988. }
  4989. while (retsockets.ordinality()<n)
  4990. retsockets.append(NULL);
  4991. CriticalSection sect;
  4992. class cNotify: implements ISocketConnectNotify
  4993. {
  4994. CriticalSection &sect;
  4995. PointerIArrayOf<ISocket> &retsockets;
  4996. public:
  4997. cNotify(PointerIArrayOf<ISocket> &_retsockets,CriticalSection &_sect)
  4998. : retsockets(_retsockets),sect(_sect)
  4999. {
  5000. }
  5001. void connected(unsigned idx,const SocketEndpoint &ep,ISocket *sock)
  5002. {
  5003. CriticalBlock block(sect);
  5004. assertex(idx<retsockets.ordinality());
  5005. sock->Link();
  5006. retsockets.replace(sock,idx);
  5007. }
  5008. void failed(unsigned idx,const SocketEndpoint &ep,int err)
  5009. {
  5010. StringBuffer s;
  5011. PROGLOG("multiConnect failed to %s with %d",ep.getUrlStr(s).str(),err);
  5012. }
  5013. } notify(retsockets,sect);
  5014. multiConnect(eps,notify,timeout);
  5015. }
  5016. inline void flushText(StringBuffer &text,unsigned short port,unsigned &rep,unsigned &range)
  5017. {
  5018. if (rep) {
  5019. text.append('*').append(rep+1);
  5020. rep = 0;
  5021. }
  5022. else if (range) {
  5023. text.append('-').append(range);
  5024. range = 0;
  5025. }
  5026. if (port)
  5027. text.append(':').append(port);
  5028. }
  5029. StringBuffer &SocketEndpointArray::getText(StringBuffer &text)
  5030. {
  5031. unsigned count = ordinality();
  5032. if (!count)
  5033. return text;
  5034. if (count==1)
  5035. return item(0).getUrlStr(text);
  5036. byte lastip[4];
  5037. const SocketEndpoint &first = item(0);
  5038. bool lastis4 = first.getNetAddress(sizeof(lastip),&lastip)==sizeof(lastip);
  5039. unsigned short lastport = first.port;
  5040. first.getIpText(text);
  5041. unsigned rep=0;
  5042. unsigned range=0;
  5043. for (unsigned i=1;i<count;i++) {
  5044. byte ip[4];
  5045. const SocketEndpoint &ep = item(i);
  5046. bool is4 = ep.getNetAddress(sizeof(ip),&ip)==sizeof(ip);
  5047. if (!lastis4||!is4) {
  5048. flushText(text,lastport,rep,range);
  5049. text.append(',');
  5050. ep.getIpText(text);
  5051. }
  5052. else { // try and shorten
  5053. unsigned j;
  5054. for (j=0;j<4;j++)
  5055. if (ip[j]!=lastip[j])
  5056. break;
  5057. if (ep.port==lastport) {
  5058. if (j==4) {
  5059. if (range) // cant have range and rep
  5060. j--; // pretend only 3 matched
  5061. else {
  5062. rep++;
  5063. continue;
  5064. }
  5065. }
  5066. else if ((j==3)&&(lastip[3]+1==ip[3])&&(rep==0)) {
  5067. range = ip[3];
  5068. lastip[3] = (byte)range;
  5069. continue;
  5070. }
  5071. }
  5072. flushText(text,lastport,rep,range);
  5073. // output diff
  5074. text.append(',');
  5075. if (j==4)
  5076. j--;
  5077. for (unsigned k=j;k<4;k++) {
  5078. if (k>j)
  5079. text.append('.');
  5080. text.append((int)ip[k]);
  5081. }
  5082. }
  5083. memcpy(&lastip,&ip,sizeof(lastip));
  5084. lastis4 = is4;
  5085. lastport = ep.port;
  5086. }
  5087. flushText(text,lastport,rep,range);
  5088. return text;
  5089. }
  5090. inline const char *getnum(const char *s,unsigned &n)
  5091. {
  5092. n = 0;
  5093. while (isdigit(*s)) {
  5094. n = n*10+(*s-'0');
  5095. s++;
  5096. }
  5097. return s;
  5098. }
  5099. inline bool appendv4range(SocketEndpointArray *array,char *str,SocketEndpoint &ep, unsigned defport)
  5100. {
  5101. char *s = str;
  5102. unsigned dc = 0;
  5103. unsigned port = defport;
  5104. unsigned rng = 0;
  5105. unsigned rep = 1;
  5106. bool notip = false;
  5107. while (*s) {
  5108. if (*s=='.') {
  5109. dc++;
  5110. s++;
  5111. }
  5112. else if (*s==':') {
  5113. *s = 0;
  5114. s = (char *)getnum(s+1,port);
  5115. }
  5116. else if (*s=='-') {
  5117. *s = 0;
  5118. s = (char *)getnum(s+1,rng);
  5119. }
  5120. else if (*s=='*') {
  5121. *s = 0;
  5122. s = (char *)getnum(s+1,rep);
  5123. }
  5124. else {
  5125. if (!isdigit(*s))
  5126. notip = true;
  5127. s++;
  5128. }
  5129. }
  5130. ep.port = port;
  5131. if (*str) {
  5132. if (!notip&&((dc<3)&&((dc!=1)||(strlen(str)!=1)))) {
  5133. if (!ep.isIp4()) {
  5134. return false;
  5135. }
  5136. StringBuffer tmp;
  5137. ep.getIpText(tmp);
  5138. size32_t l = tmp.length();
  5139. dc++;
  5140. loop {
  5141. if (tmp.length()==0)
  5142. return false;
  5143. if (tmp.charAt(tmp.length()-1)=='.')
  5144. if (--dc==0)
  5145. break;
  5146. tmp.setLength(tmp.length()-1);
  5147. }
  5148. tmp.append(str);
  5149. if (rng) {
  5150. tmp.appendf("-%d",rng);
  5151. rep = ep.ipsetrange(tmp.str());
  5152. }
  5153. else
  5154. ep.ipset(tmp.str());
  5155. }
  5156. else if (rng) { // not nice as have to add back range (must be better way - maybe ipincrementto) TBD
  5157. StringBuffer tmp;
  5158. tmp.appendf("%s-%d",str,rng);
  5159. rep = ep.ipsetrange(tmp.str());
  5160. }
  5161. else if (*str)
  5162. ep.ipset(str);
  5163. if (ep.isNull())
  5164. ep.port = 0;
  5165. for (unsigned i=0;i<rep;i++) {
  5166. array->append(ep);
  5167. if (rng)
  5168. ep.ipincrement(1);
  5169. }
  5170. }
  5171. else {// just a port change
  5172. if (ep.isNull()) // avoid null values with ports
  5173. ep.port = 0;
  5174. array->append(ep);
  5175. }
  5176. return true;
  5177. }
  5178. void SocketEndpointArray::fromText(const char *text,unsigned defport)
  5179. {
  5180. // this is quite complicated with (mixed) IPv4 and IPv6
  5181. // only support 'full' IPv6 and no ranges
  5182. char *str = strdup(text);
  5183. char *s = str;
  5184. SocketEndpoint ep;
  5185. bool eol = false;
  5186. loop {
  5187. while (isspace(*s)||(*s==','))
  5188. s++;
  5189. if (!*s)
  5190. break;
  5191. char *e=s;
  5192. if (*e=='[') { // we have a IPv6
  5193. while (*e&&(*e!=']'))
  5194. e++;
  5195. while ((*e!=',')&&!isspace(*e)) {
  5196. if (!*s) {
  5197. eol = true;
  5198. break;
  5199. }
  5200. e++;
  5201. }
  5202. *e = 0;
  5203. ep.set(s,defport);
  5204. if (ep.isNull()) {
  5205. // Error TBD
  5206. }
  5207. append(ep);
  5208. }
  5209. else {
  5210. bool hascolon = false;
  5211. bool isv6 = false;
  5212. do {
  5213. if (*e==':') {
  5214. if (hascolon)
  5215. isv6 = true;
  5216. else
  5217. hascolon = true;
  5218. }
  5219. e++;
  5220. if (!*e) {
  5221. eol = true;
  5222. break;
  5223. }
  5224. } while (!isspace(*e)&&(*e!=','));
  5225. *e = 0;
  5226. if (isv6) {
  5227. ep.set(s,defport);
  5228. if (ep.isNull()) {
  5229. // Error TBD
  5230. }
  5231. append(ep);
  5232. }
  5233. else {
  5234. if (!appendv4range(this,s,ep,defport)) {
  5235. // Error TBD
  5236. }
  5237. }
  5238. }
  5239. if (eol)
  5240. break;
  5241. s = e+1;
  5242. }
  5243. free(str);
  5244. }
  5245. bool IpSubNet::set(const char *_net,const char *_mask)
  5246. {
  5247. if (!_net||!decodeNumericIP(_net,net)) { // _net NULL means match everything
  5248. memset(net,0,sizeof(net));
  5249. memset(mask,0,sizeof(mask));
  5250. return (_net==NULL);
  5251. }
  5252. if (!_mask||!decodeNumericIP(_mask,mask)) { // _mask NULL means match exact
  5253. memset(mask,0xff,sizeof(mask));
  5254. return (_mask==NULL);
  5255. }
  5256. if (isIp4(net)!=isIp4(mask))
  5257. return false;
  5258. for (unsigned j=0;j<4;j++)
  5259. if (net[j]&~mask[j])
  5260. return false;
  5261. return true;
  5262. }
  5263. bool IpSubNet::test(const IpAddress &ip) const
  5264. {
  5265. unsigned i;
  5266. if (ip.getNetAddress(sizeof(i),&i)==sizeof(i)) {
  5267. if (!isIp4(net))
  5268. return false;
  5269. return (i&mask[3])==(net[3]&mask[3]);
  5270. }
  5271. unsigned na[4];
  5272. if (ip.getNetAddress(sizeof(na),&na)==sizeof(na)) {
  5273. for (unsigned j=0;j<4;j++)
  5274. if ((na[j]&mask[j])!=(net[j]&mask[j]))
  5275. return false;
  5276. return true;
  5277. }
  5278. return false;
  5279. }
  5280. StringBuffer IpSubNet::getNetText(StringBuffer &text) const
  5281. {
  5282. char tmp[INET6_ADDRSTRLEN];
  5283. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &net[3], tmp, sizeof(tmp))
  5284. : _inet_ntop(AF_INET6, &net, tmp, sizeof(tmp));
  5285. return text.append(res);
  5286. }
  5287. StringBuffer IpSubNet::getMaskText(StringBuffer &text) const
  5288. {
  5289. char tmp[INET6_ADDRSTRLEN];
  5290. // isIp4(net) is correct here
  5291. const char *res = ::isIp4(net) ? _inet_ntop(AF_INET, &mask[3], tmp, sizeof(tmp))
  5292. : _inet_ntop(AF_INET6, &mask, tmp, sizeof(tmp));
  5293. return text.append(res);
  5294. }
  5295. bool IpSubNet::isNull() const
  5296. {
  5297. for (unsigned i=0;i<4;i++)
  5298. if (net[i]||mask[i])
  5299. return false;
  5300. return true;
  5301. }
  5302. IpSubNet &queryPreferredSubnet()
  5303. {
  5304. return PreferredSubnet;
  5305. }
  5306. bool setPreferredSubnet(const char *ip,const char *mask)
  5307. {
  5308. // also resets cached host IP
  5309. if (PreferredSubnet.set(ip,mask))
  5310. {
  5311. if (!cachehostip.isNull())
  5312. {
  5313. cachehostip.ipset(NULL);
  5314. queryHostIP();
  5315. }
  5316. return true;
  5317. }
  5318. else
  5319. return false;
  5320. }
  5321. StringBuffer lookupHostName(const IpAddress &ip,StringBuffer &ret)
  5322. {
  5323. // not a common routine (no Jlib function!) only support IPv4 initially
  5324. unsigned ipa;
  5325. if (ip.getNetAddress(sizeof(ipa),&ipa)==sizeof(ipa)) {
  5326. struct hostent *phostent = gethostbyaddr( (char *) &ipa, sizeof(ipa), PF_INET);
  5327. if (phostent)
  5328. ret.append(phostent->h_name);
  5329. else
  5330. ip.getIpText(ret);
  5331. }
  5332. else
  5333. ip.getIpText(ret);
  5334. return ret;
  5335. }
  5336. struct SocketEndpointHTElem
  5337. {
  5338. IInterface *ii;
  5339. SocketEndpoint ep;
  5340. SocketEndpointHTElem(const SocketEndpoint _ep,IInterface *_ii) { ep.set(_ep); ii = _ii; }
  5341. ~SocketEndpointHTElem() { ::Release(ii); }
  5342. };
  5343. class jlib_decl CSocketEndpointHashTable : public SuperHashTableOf<SocketEndpointHTElem, SocketEndpoint>, implements ISocketEndpointHashTable
  5344. {
  5345. virtual void onAdd(void *) {}
  5346. virtual void onRemove(void *e) { delete (SocketEndpointHTElem *)e; }
  5347. unsigned getHashFromElement(const void *e) const
  5348. {
  5349. return ((const SocketEndpointHTElem *)e)->ep.hash(0);
  5350. }
  5351. unsigned getHashFromFindParam(const void *fp) const
  5352. {
  5353. return ((const SocketEndpoint *)fp)->hash(0);
  5354. }
  5355. const void * getFindParam(const void *p) const
  5356. {
  5357. return &((const SocketEndpointHTElem *)p)->ep;
  5358. }
  5359. bool matchesFindParam(const void * et, const void *fp, unsigned) const
  5360. {
  5361. return ((const SocketEndpointHTElem *)et)->ep.equals(*(SocketEndpoint *)fp);
  5362. }
  5363. IMPLEMENT_SUPERHASHTABLEOF_REF_FIND(SocketEndpointHTElem,SocketEndpoint);
  5364. public:
  5365. IMPLEMENT_IINTERFACE;
  5366. CSocketEndpointHashTable() {}
  5367. ~CSocketEndpointHashTable() { kill(); }
  5368. void add(const SocketEndpoint &ep, IInterface *i)
  5369. {
  5370. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  5371. if (e) {
  5372. ::Release(e->ii);
  5373. e->ii = i;
  5374. }
  5375. else {
  5376. e = new SocketEndpointHTElem(ep,i);
  5377. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::add(*e);
  5378. }
  5379. }
  5380. void remove(const SocketEndpoint &ep)
  5381. {
  5382. SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::remove(&ep);
  5383. }
  5384. IInterface *find(const SocketEndpoint &ep)
  5385. {
  5386. SocketEndpointHTElem *e = SuperHashTableOf<SocketEndpointHTElem,SocketEndpoint>::find(&ep);
  5387. if (e)
  5388. return e->ii;
  5389. return NULL;
  5390. }
  5391. };
  5392. ISocketEndpointHashTable *createSocketEndpointHashTable()
  5393. {
  5394. CSocketEndpointHashTable *ht = new CSocketEndpointHashTable;
  5395. return ht;
  5396. }
  5397. class CSocketConnectWait: public CInterface, implements ISocketConnectWait
  5398. {
  5399. Owned<CSocket> sock;
  5400. bool done;
  5401. CTimeMon connecttm;
  5402. unsigned startt;
  5403. bool oneshot;
  5404. bool isopen;
  5405. int initerr;
  5406. void successfulConnect()
  5407. {
  5408. STATS.connects++;
  5409. STATS.connecttime+=usTick()-startt;
  5410. #ifdef _TRACE
  5411. char peer[256];
  5412. peer[0] = 'C';
  5413. peer[1] = '!';
  5414. strcpy(peer+2,sock->hostname?sock->hostname:"(NULL)");
  5415. free(sock->tracename);
  5416. sock->tracename = strdup(peer);
  5417. #endif
  5418. }
  5419. void failedConnect()
  5420. {
  5421. STATS.failedconnects++;
  5422. STATS.failedconnecttime+=usTick()-startt;
  5423. const char* tracename = sock->tracename;
  5424. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  5425. }
  5426. public:
  5427. IMPLEMENT_IINTERFACE;
  5428. CSocketConnectWait(SocketEndpoint &ep,unsigned connecttimeoutms)
  5429. : connecttm(connecttimeoutms)
  5430. {
  5431. oneshot = (connecttimeoutms==0); // i.e. as long as one connect takes
  5432. done = false;
  5433. startt = usTick();
  5434. sock.setown(new CSocket(ep,sm_tcp,NULL));
  5435. isopen = true;
  5436. initerr = sock->pre_connect(false);
  5437. }
  5438. ISocket *wait(unsigned timems)
  5439. {
  5440. // this is a bit spagetti due to dual timeouts etc
  5441. CTimeMon waittm(timems);
  5442. unsigned refuseddelay = 1;
  5443. bool waittimedout = false;
  5444. bool connectimedout = false;
  5445. do {
  5446. bool connectdone = false;
  5447. unsigned remaining;
  5448. connectimedout = connecttm.timedout(&remaining);
  5449. unsigned waitremaining;
  5450. waittimedout = waittm.timedout(&waitremaining);
  5451. if (oneshot||(waitremaining<remaining))
  5452. remaining = waitremaining;
  5453. int err = 0;
  5454. if (!isopen||initerr) {
  5455. isopen = true;
  5456. err = initerr?initerr:sock->pre_connect(false);
  5457. initerr = 0;
  5458. if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
  5459. err = 0; // continue
  5460. else {
  5461. if (err==0)
  5462. connectdone = true; // done immediately
  5463. else if(!oneshot) // probably ECONNREFUSED but treat all errors same
  5464. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  5465. }
  5466. }
  5467. if (!connectdone&&(err==0)) {
  5468. SOCKET s = sock->sock;
  5469. T_FD_SET fds;
  5470. struct timeval tv;
  5471. XFD_ZERO(&fds);
  5472. FD_SET((unsigned)s, &fds);
  5473. T_FD_SET except;
  5474. XFD_ZERO(&except);
  5475. FD_SET((unsigned)s, &except);
  5476. tv.tv_sec = remaining / 1000;
  5477. tv.tv_usec = (remaining % 1000)*1000;
  5478. CHECKSOCKRANGE(s);
  5479. int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
  5480. if (rc==0)
  5481. break; // timeout
  5482. done = true;
  5483. err = 0;
  5484. if (rc>0) {
  5485. // select succeeded - return error from socket (0 if connected)
  5486. socklen_t errlen = sizeof(err);
  5487. rc = getsockopt(s, SOL_SOCKET, SO_ERROR, (char *)&err, &errlen); // check for error
  5488. if ((rc!=0)&&!err)
  5489. err = ERRNO(); // some implementations of getsockopt duff
  5490. if (err&&!oneshot) // probably ECONNREFUSED but treat all errors same
  5491. refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
  5492. }
  5493. else { // select failed
  5494. err = ERRNO();
  5495. LOGERR(err,2,"CSocketConnectWait ::select");
  5496. }
  5497. }
  5498. if (err==0) {
  5499. err = sock->post_connect();
  5500. if (err==0) {
  5501. successfulConnect();
  5502. return sock.getClear();
  5503. }
  5504. }
  5505. sock->errclose();
  5506. isopen = false;
  5507. } while (!waittimedout&&!oneshot);
  5508. if (connectimedout) {
  5509. STATS.failedconnects++;
  5510. STATS.failedconnecttime+=usTick()-startt;
  5511. const char* tracename = sock->tracename;
  5512. THROWJSOCKEXCEPTION(JSOCKERR_connection_failed);
  5513. }
  5514. return NULL;
  5515. }
  5516. };
  5517. ISocketConnectWait *nonBlockingConnect(SocketEndpoint &ep,unsigned connecttimeoutms)
  5518. {
  5519. return new CSocketConnectWait(ep,connecttimeoutms);
  5520. }